This is an automated email from the ASF dual-hosted git repository.

style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git


The following commit(s) were added to refs/heads/master by this push:
     new 9279626  Implement MongoDBArtifactStore (#4963)
9279626 is described below

commit 927962692e45fc7276ccb7902fd3a111daa54885
Author: jiangpch <[email protected]>
AuthorDate: Wed Jun 9 08:54:26 2021 +0800

    Implement MongoDBArtifactStore (#4963)
    
    * Implement MongoDBActivationStore
    
    Co-authored-by: Chetan Mehrotra <[email protected]>
    
    * Upgrade mongo-scala to 2.7.0
    
    * Fix test
    
    * Add license headers
    
    * Add default value for mongodb_connect_string
    
    * Rename graph stage classes used for mongodb gridfs
    
    * Update readme
    
    * Update based on comments
    
    * Fix typo and update README
    
    * Rename db.backend to db.artifact_store.backend
    
    Co-authored-by: Chetan Mehrotra <[email protected]>
---
 ansible/README.md                                  |  37 ++
 ansible/group_vars/all                             |   8 +
 ansible/{tasks/db/checkDb.yml => initMongoDB.yml}  |  33 +-
 ansible/library/mongodb.py                         | 283 +++++++++
 ansible/{tasks/db/checkDb.yml => mongodb.yml}      |  30 +-
 ansible/roles/controller/tasks/deploy.yml          |  13 +
 ansible/roles/invoker/tasks/deploy.yml             |  13 +
 .../checkDb.yml => roles/mongodb/tasks/clean.yml}  |  18 +-
 .../checkDb.yml => roles/mongodb/tasks/deploy.yml} |  32 +-
 .../checkDb.yml => roles/mongodb/tasks/main.yml}   |  21 +-
 ansible/tasks/db/checkDb.yml                       |   4 +
 ansible/templates/whisk.conf.j2                    |  10 +
 common/scala/build.gradle                          |   2 +
 common/scala/src/main/resources/application.conf   |   7 +
 .../org/apache/openwhisk/core/WhiskConfig.scala    |   1 +
 .../database/mongodb/MongoDBArtifactStore.scala    | 661 +++++++++++++++++++++
 .../mongodb/MongoDBArtifactStoreProvider.scala     |  99 +++
 .../database/mongodb/MongoDBAsyncStreamSink.scala  | 122 ++++
 .../mongodb/MongoDBAsyncStreamSource.scala         | 104 ++++
 .../core/database/mongodb/MongoDBViewMapper.scala  | 224 +++++++
 tests/src/test/resources/application.conf.j2       |   5 +
 .../mongodb/MongoDBArtifactStoreTests.scala        |  26 +
 .../mongodb/MongoDBAsyncStreamGraphTests.scala     | 153 +++++
 .../mongodb/MongoDBAttachmentStoreTests.scala      |  33 +
 .../mongodb/MongoDBStoreBehaviorBase.scala         |  62 ++
 .../database/mongodb/MongoDBViewMapperTests.scala  | 256 ++++++++
 tools/build/README.md                              |   1 +
 tools/build/redo                                   |   7 +
 28 files changed, 2205 insertions(+), 60 deletions(-)

diff --git a/ansible/README.md b/ansible/README.md
index d08b688..7cc05f8 100644
--- a/ansible/README.md
+++ b/ansible/README.md
@@ -196,6 +196,43 @@ ansible-playbook -i environments/$ENVIRONMENT routemgmt.yml
 - To use the API Gateway, you'll need to run `apigateway.yml` and 
`routemgmt.yml`.
 - Use `ansible-playbook -i environments/$ENVIRONMENT openwhisk.yml` to avoid 
wiping the data store. This is useful to start OpenWhisk after restarting your 
Operating System.
 
+### Deploying Using MongoDB
+
+You can choose MongoDB instead of CouchDB as the database backend to store 
entities.
+
+- Deploy a mongodb server(Optional, for test and develop only, use an external 
MongoDB server in production).
+  You need to execute `pip install pymongo` first
+
+```
+ansible-playbook -i environments/<environment> mongodb.yml -e 
mongodb_data_volume="/tmp/mongo-data"
+```
+
+- Then execute
+
+```
+cd <openwhisk_home>
+./gradlew distDocker
+cd ansible
+ansible-playbook -i environments/<environment> initMongodb.yml -e 
mongodb_connect_string="mongodb://172.17.0.1:27017"
+ansible-playbook -i environments/<environment> apigateway.yml -e 
mongodb_connect_string="mongodb://172.17.0.1:27017"
+ansible-playbook -i environments/<environment> openwhisk.yml -e 
mongodb_connect_string="mongodb://172.17.0.1:27017" -e 
db_artifact_backend="MongoDB"
+
+# installs a catalog of public packages and actions
+ansible-playbook -i environments/<environment> postdeploy.yml
+
+# to use the API gateway
+ansible-playbook -i environments/<environment> apigateway.yml
+ansible-playbook -i environments/<environment> routemgmt.yml
+```
+
+Available parameters for ansible are
+```
+  mongodb:
+    connect_string: "{{ mongodb_connect_string }}"
+    database: "{{ mongodb_database | default('whisks') }}"
+    data_volume: "{{ mongodb_data_volume | default('mongo-data') }}"
+```
+
 ### Using ElasticSearch to Store Activations
 
 You can use ElasticSearch (ES) to store activations separately while other 
entities remain stored in CouchDB. There is an Ansible playbook to setup a 
simple ES cluster for testing and development purposes.
diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 1467639..79112a9 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -278,6 +278,8 @@ db:
     invoker:
       user: "{{ db_invoker_user | default(lookup('ini', 'db_username 
section=invoker file={{ playbook_dir }}/db_local.ini')) }}"
       pass: "{{ db_invoker_pass | default(lookup('ini', 'db_password 
section=invoker file={{ playbook_dir }}/db_local.ini')) }}"
+  artifact_store:
+    backend: "{{ db_artifact_backend | default('CouchDB') }}"
   activation_store:
     backend: "{{ db_activation_backend | default('CouchDB') }}"
   elasticsearch:
@@ -299,6 +301,10 @@ db:
       admin:
         username: "{{ elastic_username | default('admin') }}"
         password: "{{ elastic_password | default('admin') }}"
+  mongodb:
+    connect_string: "{{ mongodb_connect_string | 
default('mongodb://172.17.0.1:27017') }}"
+    database: "{{ mongodb_database | default('whisks') }}"
+    data_volume: "{{ mongodb_data_volume | default('mongo-data') }}"
 
 apigateway:
   port:
@@ -326,6 +332,8 @@ elasticsearch_connect_string: "{% set ret = [] %}\
                                {{ ret.append( hostvars[host].ansible_host + 
':' + ((db.elasticsearch.port+loop.index-1)|string) ) }}\
                                {% endfor %}\
                                {{ ret | join(',') }}"
+mongodb:
+  version: 4.4.0
 
 docker:
   # The user to install docker for. Defaults to the ansible user if not set. 
This will be the user who is able to run
diff --git a/ansible/tasks/db/checkDb.yml b/ansible/initMongoDB.yml
similarity index 53%
copy from ansible/tasks/db/checkDb.yml
copy to ansible/initMongoDB.yml
index 5962400..58672a6 100644
--- a/ansible/tasks/db/checkDb.yml
+++ b/ansible/initMongoDB.yml
@@ -15,16 +15,25 @@
 # limitations under the License.
 #
 ---
-# Checks, that the Database exists
-# dbName - name of the database to check
-# dbUser - name of the user which should have access rights
-# dbPass - password of the user which should have access
+# This playbook will initialize the immortal DBs in the database account.
+# This step is usually done only once per deployment.
 
-- name: check if {{ dbName }} with {{ db.provider }} exists
-  uri:
-    url: "{{ db.protocol }}://{{ db.host }}:{{ db.port }}/{{ dbName }}"
-    method: HEAD
-    status_code: 200
-    user: "{{ dbUser }}"
-    password: "{{ dbPass }}"
-    force_basic_auth: yes
+- hosts: ansible
+  tasks:
+  - name: create necessary auth keys
+    mongodb:
+      connect_string: "{{ db.mongodb.connect_string }}"
+      database: "{{ db.mongodb.database }}"
+      collection: "whiskauth"
+      doc:
+        _id: "{{ item }}"
+        subject: "{{ item }}"
+        namespaces:
+          - name: "{{ item }}"
+            uuid: "{{ key.split(':')[0] }}"
+            key: "{{ key.split(':')[1] }}"
+      mode: "doc"
+      force_update: True
+    vars:
+      key: "{{ lookup('file', 'files/auth.{{ item }}') }}"
+    with_items: "{{ db.authkeys }}"
diff --git a/ansible/library/mongodb.py b/ansible/library/mongodb.py
new file mode 100644
index 0000000..fb610fa
--- /dev/null
+++ b/ansible/library/mongodb.py
@@ -0,0 +1,283 @@
+#!/usr/bin/python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import, division, print_function
+__metaclass__ = type
+
+
+DOCUMENTATION = '''
+---
+module: mongodb
+short_description:  A module which support some simple operations on MongoDB.
+description:
+    - Including add user/insert document/create indexes in MongoDB
+options:
+    connect_string:
+        description:
+            - The uri of mongodb server
+        required: true
+    database:
+        description:
+            - The name of the database you want to manipulate
+        required: true
+    user:
+        description:
+            - The name of the user to add or remove, required when use 'user' 
mode
+        required: false
+        default: null
+    password:
+        description:
+            - The password to use for the user, required when use 'user' mode
+        required: false
+        default: null
+    roles:
+        description:
+            - The roles of the user, it's a list of dict, each dict requires 
two fields: 'db' and 'role', required when use 'user' mode
+        required: false
+        default: null
+    collection:
+        required: false
+        description:
+            - The name of the collection you want to manipulate, required when 
use 'doc' or 'indexes' mode
+    doc:
+        required: false
+        description:
+            - The document you want to insert into MongoDB, required when use 
'doc' mode
+    indexes:
+        required: false
+        description:
+            - The indexes you want to create in MongoDB, it's a list of dict, 
you can see the example for the usage, required when use 'index' mode
+    force_update:
+        required: false
+        description:
+            - Whether replace/update existing user or doc or raise 
DuplicateKeyError, default is false
+    mode:
+        required: false
+        default: user
+        choices: ['user', 'doc', 'index']
+        description:
+            - use 'user' mode if you want to add user, 'doc' mode to insert 
document, 'index' mode to create indexes
+
+requirements: [ "pymongo" ]
+author:
+    - "Jinag PengCheng"
+'''
+
+EXAMPLES = '''
+# add user
+- mongodb:
+    connect_string: mongodb://localhost:27017
+    database: admin
+    user: test
+    password: 123456
+    roles:
+      - db: test_database
+        role: read
+    force_update: true
+
+# add doc
+- mongodb:
+    connect_string: mongodb://localhost:27017
+    mode: doc
+    database: admin
+    collection: main
+    doc:
+      id: "id/document"
+      title: "the name of document"
+      content: "which doesn't matter"
+    force_update: true
+
+# add indexes
+- mongodb:
+    connect_string: mongodb://localhost:27017
+    mode: index
+    database: admin
+    collection: main
+    indexes:
+      - index:
+        - field: updated_at
+          direction: 1
+        - field: name
+          direction: -1
+        name: test-index
+        unique: true
+'''
+
+import traceback
+
+from ansible.module_utils.basic import AnsibleModule
+from ansible.module_utils._text import to_native
+
+try:
+    from pymongo import ASCENDING, DESCENDING, GEO2D, GEOHAYSTACK, GEOSPHERE, 
HASHED, TEXT
+    from pymongo import IndexModel
+    from pymongo import MongoClient
+    from pymongo.errors import DuplicateKeyError
+except ImportError:
+    pass
+
+
+# =========================================
+# MongoDB module specific support methods.
+#
+
+class UnknownIndexPlugin(Exception):
+    pass
+
+
+def check_params(params, mode, module):
+    missed_params = []
+    for key in OPERATIONS[mode]['required']:
+        if params[key] is None:
+            missed_params.append(key)
+
+    if missed_params:
+        module.fail_json(msg="missing required arguments: %s" % 
(",".join(missed_params)))
+
+
+def _recreate_user(module, db, user, password, roles):
+    try:
+        db.command("dropUser", user)
+        db.command("createUser", user, pwd=password, roles=roles)
+    except Exception as e:
+        module.fail_json(msg='Unable to create user: %s' % to_native(e), 
exception=traceback.format_exc())
+
+
+
+def user(module, client, db_name, **kwargs):
+    roles = kwargs['roles']
+    if roles is None:
+        roles = []
+    db = client[db_name]
+
+    try:
+        db.command("createUser", kwargs['user'], pwd=kwargs['password'], 
roles=roles)
+    except DuplicateKeyError as e:
+        if kwargs['force_update']:
+            _recreate_user(module, db, kwargs['user'], kwargs['password'], 
roles)
+        else:
+            module.fail_json(msg='Unable to create user: %s' % to_native(e), 
exception=traceback.format_exc())
+    except Exception as e:
+        module.fail_json(msg='Unable to create user: %s' % to_native(e), 
exception=traceback.format_exc())
+
+    module.exit_json(changed=True, user=kwargs['user'])
+
+
+def doc(module, client, db_name, **kwargs):
+    coll = client[db_name][kwargs['collection']]
+    try:
+        coll.insert_one(kwargs['doc'])
+    except DuplicateKeyError as e:
+        if kwargs['force_update']:
+            try:
+                coll.replace_one({'_id': kwargs['doc']['_id']}, kwargs['doc'])
+            except Exception as e:
+                module.fail_json(msg='Unable to insert doc: %s' % 
to_native(e), exception=traceback.format_exc())
+        else:
+            module.fail_json(msg='Unable to insert doc: %s' % to_native(e), 
exception=traceback.format_exc())
+    except Exception as e:
+        module.fail_json(msg='Unable to insert doc: %s' % to_native(e), 
exception=traceback.format_exc())
+
+    kwargs['doc']['_id'] = str(kwargs['doc']['_id'])
+    module.exit_json(changed=True, doc=kwargs['doc'])
+
+
+def _clean_index_direction(direction):
+    if direction in ["1", "-1"]:
+        direction = int(direction)
+
+    if direction not in [ASCENDING, DESCENDING, GEO2D, GEOHAYSTACK, GEOSPHERE, 
HASHED, TEXT]:
+        raise UnknownIndexPlugin("Unable to create indexes: Unknown index 
plugin: %s" % direction)
+    return direction
+
+
+def _clean_index_options(options):
+    res = {}
+    supported_options = set(['name', 'unique', 'background', 'sparse', 
'bucketSize', 'min', 'max', 'expireAfterSeconds'])
+    for key in set(options.keys()).intersection(supported_options):
+        res[key] = options[key]
+        if key in ['min', 'max', 'bucketSize', 'expireAfterSeconds']:
+            res[key] = int(res[key])
+
+    return res
+
+
+def parse_indexes(idx):
+    keys = [(k['field'], _clean_index_direction(k['direction'])) for k in 
idx.pop('index')]
+    options = _clean_index_options(idx)
+    return IndexModel(keys, **options)
+
+
+def index(module, client, db_name, **kwargs):
+    parsed_indexes = map(parse_indexes, kwargs['indexes'])
+    try:
+        coll = client[db_name][kwargs['collection']]
+        coll.create_indexes(parsed_indexes)
+    except Exception as e:
+        module.fail_json(msg='Unable to create indexes: %s' % to_native(e), 
exception=traceback.format_exc())
+
+    module.exit_json(changed=True, indexes=kwargs['indexes'])
+
+
+OPERATIONS = {
+    'user': { 'function': user, 'params': ['user', 'password', 'roles', 
'force_update'], 'required': ['user', 'password']},
+    'doc': {'function': doc, 'params': ['doc', 'collection', 'force_update'], 
'required': ['doc', 'collection']},
+    'index': {'function': index, 'params': ['indexes', 'collection'], 
'required': ['indexes', 'collection']}
+}
+
+
+# =========================================
+# Module execution.
+#
+
+def main():
+    module = AnsibleModule(
+        argument_spec=dict(
+            connect_string=dict(required=True),
+            database=dict(required=True, aliases=['db']),
+            mode=dict(default='user', choices=['user', 'doc', 'index']),
+            user=dict(default=None),
+            password=dict(default=None, no_log=True),
+            roles=dict(default=None, type='list'),
+            collection=dict(default=None),
+            doc=dict(default=None, type='dict'),
+            force_update=dict(default=False, type='bool'),
+            indexes=dict(default=None, type='list'),
+        )
+    )
+
+    mode = module.params['mode']
+
+    db_name = module.params['database']
+
+    params = {key: module.params[key] for key in OPERATIONS[mode]['params']}
+    check_params(params, mode, module)
+
+    try:
+        client = MongoClient(module.params['connect_string'])
+    except NameError:
+        module.fail_json(msg='the python pymongo module is required')
+    except Exception as e:
+        module.fail_json(msg='unable to connect to database: %s' % 
to_native(e), exception=traceback.format_exc())
+
+    OPERATIONS[mode]['function'](module, client, db_name, **params)
+
+
+if __name__ == '__main__':
+    main()
diff --git a/ansible/tasks/db/checkDb.yml b/ansible/mongodb.yml
similarity index 60%
copy from ansible/tasks/db/checkDb.yml
copy to ansible/mongodb.yml
index 5962400..2a0b4f6 100644
--- a/ansible/tasks/db/checkDb.yml
+++ b/ansible/mongodb.yml
@@ -15,16 +15,22 @@
 # limitations under the License.
 #
 ---
-# Checks, that the Database exists
-# dbName - name of the database to check
-# dbUser - name of the user which should have access rights
-# dbPass - password of the user which should have access
+# This playbook deploys a MongoDB for Openwhisk.
 
-- name: check if {{ dbName }} with {{ db.provider }} exists
-  uri:
-    url: "{{ db.protocol }}://{{ db.host }}:{{ db.port }}/{{ dbName }}"
-    method: HEAD
-    status_code: 200
-    user: "{{ dbUser }}"
-    password: "{{ dbPass }}"
-    force_basic_auth: yes
+- hosts: localhost
+  tasks:
+  - name: check if db_local.ini exists?
+    tags: ini
+    stat: path="{{ playbook_dir }}/db_local.ini"
+    register: db_check
+
+  - name: prepare db_local.ini
+    tags: ini
+    local_action: template src="db_local.ini.j2" dest="{{ playbook_dir 
}}/db_local.ini"
+    when: not db_check.stat.exists
+
+# This is for test, only deploy it on the first node, please use a shard 
cluster mongodb for
+# production env
+- hosts: db[0]
+  roles:
+    - mongodb
diff --git a/ansible/roles/controller/tasks/deploy.yml 
b/ansible/roles/controller/tasks/deploy.yml
index 21f33a6..931d657 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -296,6 +296,19 @@
     env: "{{ env | combine(elastic_env) }}"
   when: db.activation_store.backend == "ElasticSearch"
 
+- name: setup mongodb artifact store env
+  set_fact:
+    mongodb_env:
+      "CONFIG_whisk_mongodb_uri": "{{ db.mongodb.connect_string }}"
+      "CONFIG_whisk_mongodb_database": "{{ db.mongodb.database }}"
+      "CONFIG_whisk_spi_ArtifactStoreProvider": 
"org.apache.openwhisk.core.database.mongodb.MongoDBArtifactStoreProvider"
+  when: db.artifact_store.backend == "MongoDB"
+
+- name: merge mongodb artifact store env
+  set_fact:
+    env: "{{ env | combine(mongodb_env) }}"
+  when: db.artifact_store.backend == "MongoDB"
+
 - name: populate volumes for controller
   set_fact:
     controller_volumes:
diff --git a/ansible/roles/invoker/tasks/deploy.yml 
b/ansible/roles/invoker/tasks/deploy.yml
index 591a07e..28d8ead 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -315,6 +315,19 @@
     env: "{{ env | combine(elastic_env) }}"
   when: db.activation_store.backend == "ElasticSearch"
 
+- name: setup mongodb artifact store env
+  set_fact:
+    mongodb_env:
+      "CONFIG_whisk_mongodb_uri": "{{ db.mongodb.connect_string }}"
+      "CONFIG_whisk_mongodb_database": "{{ db.mongodb.database }}"
+      "CONFIG_whisk_spi_ArtifactStoreProvider": 
"org.apache.openwhisk.core.database.mongodb.MongoDBArtifactStoreProvider"
+  when: db.artifact_store.backend == "MongoDB"
+
+- name: merge mongodb artifact store env
+  set_fact:
+    env: "{{ env | combine(mongodb_env) }}"
+  when: db.artifact_store.backend == "MongoDB"
+
 - name: include plugins
   include_tasks: "{{ inv_item }}.yml"
   with_items: "{{ invoker_plugins | default([]) }}"
diff --git a/ansible/tasks/db/checkDb.yml 
b/ansible/roles/mongodb/tasks/clean.yml
similarity index 63%
copy from ansible/tasks/db/checkDb.yml
copy to ansible/roles/mongodb/tasks/clean.yml
index 5962400..248ee55 100644
--- a/ansible/tasks/db/checkDb.yml
+++ b/ansible/roles/mongodb/tasks/clean.yml
@@ -15,16 +15,10 @@
 # limitations under the License.
 #
 ---
-# Checks, that the Database exists
-# dbName - name of the database to check
-# dbUser - name of the user which should have access rights
-# dbPass - password of the user which should have access
+# Remove MongoDB server
 
-- name: check if {{ dbName }} with {{ db.provider }} exists
-  uri:
-    url: "{{ db.protocol }}://{{ db.host }}:{{ db.port }}/{{ dbName }}"
-    method: HEAD
-    status_code: 200
-    user: "{{ dbUser }}"
-    password: "{{ dbPass }}"
-    force_basic_auth: yes
+- name: remove MongoDB
+  docker_container:
+    name: mongodb
+    state: absent
+    keep_volumes: False
diff --git a/ansible/tasks/db/checkDb.yml 
b/ansible/roles/mongodb/tasks/deploy.yml
similarity index 53%
copy from ansible/tasks/db/checkDb.yml
copy to ansible/roles/mongodb/tasks/deploy.yml
index 5962400..14bf146 100644
--- a/ansible/tasks/db/checkDb.yml
+++ b/ansible/roles/mongodb/tasks/deploy.yml
@@ -15,16 +15,24 @@
 # limitations under the License.
 #
 ---
-# Checks, that the Database exists
-# dbName - name of the database to check
-# dbUser - name of the user which should have access rights
-# dbPass - password of the user which should have access
+# This role will run a MongoDB server on the db group, this is only for test, 
please use
+# shared cluster for production env
 
-- name: check if {{ dbName }} with {{ db.provider }} exists
-  uri:
-    url: "{{ db.protocol }}://{{ db.host }}:{{ db.port }}/{{ dbName }}"
-    method: HEAD
-    status_code: 200
-    user: "{{ dbUser }}"
-    password: "{{ dbPass }}"
-    force_basic_auth: yes
+- name: (re)start mongodb
+  vars:
+    mongodb_image: "{{ mongodb.docker_image | default('mongo:' ~ 
mongodb.version ) }}"
+  docker_container:
+    name: mongodb
+    image: "{{ mongodb_image }}"
+    state: started
+    recreate: true
+    restart_policy: "{{ docker.restart.policy }}"
+    hostname: "mongodb"
+    user: "mongodb"
+    volumes:
+      - "{{ db.mongodb.data_volume }}:/data/db"
+    ports:
+      - "27017:27017"
+
+- name: wait until the MongoDB in this host is up and running
+  local_action: wait_for host={{ ansible_host }} port=27017 state=started 
delay=5 timeout=60
diff --git a/ansible/tasks/db/checkDb.yml b/ansible/roles/mongodb/tasks/main.yml
similarity index 63%
copy from ansible/tasks/db/checkDb.yml
copy to ansible/roles/mongodb/tasks/main.yml
index 5962400..2d27a9c 100644
--- a/ansible/tasks/db/checkDb.yml
+++ b/ansible/roles/mongodb/tasks/main.yml
@@ -15,16 +15,13 @@
 # limitations under the License.
 #
 ---
-# Checks, that the Database exists
-# dbName - name of the database to check
-# dbUser - name of the user which should have access rights
-# dbPass - password of the user which should have access
+# This role will deploy a database server. Use the role if you want to use 
CouchCB locally.
+# In deploy mode it will start the MongoDB container.
+# In clean mode it will remove the MongoDB container.
+
+- import_tasks: deploy.yml
+  when: mode == "deploy"
+
+- import_tasks: clean.yml
+  when: mode == "clean"
 
-- name: check if {{ dbName }} with {{ db.provider }} exists
-  uri:
-    url: "{{ db.protocol }}://{{ db.host }}:{{ db.port }}/{{ dbName }}"
-    method: HEAD
-    status_code: 200
-    user: "{{ dbUser }}"
-    password: "{{ dbPass }}"
-    force_basic_auth: yes
diff --git a/ansible/tasks/db/checkDb.yml b/ansible/tasks/db/checkDb.yml
index 5962400..bb78a66 100644
--- a/ansible/tasks/db/checkDb.yml
+++ b/ansible/tasks/db/checkDb.yml
@@ -28,3 +28,7 @@
     user: "{{ dbUser }}"
     password: "{{ dbPass }}"
     force_basic_auth: yes
+  when: db.artifact_store.backend == "CouchDB"
+
+# the collection in MongoDB doesn't need to be created in advance, so just 
skip it
+# - name: check if {{ dbName }} on MongoDB exists
diff --git a/ansible/templates/whisk.conf.j2 b/ansible/templates/whisk.conf.j2
index 9bb9d53..c6ef559 100644
--- a/ansible/templates/whisk.conf.j2
+++ b/ansible/templates/whisk.conf.j2
@@ -14,4 +14,14 @@ whisk {
       WhiskActivation = "{{ db.whisk.activations }}"
     }
   }
+  {% if db.artifact_store.backend == 'MongoDB' %}
+  mongodb {
+    uri         = "{{ db.mongodb.connect_string }}"
+    database    = "{{ db.mongodb.database }}"
+  }
+
+  spi {
+    ArtifactStoreProvider = 
org.apache.openwhisk.core.database.mongodb.MongoDBArtifactStoreProvider
+  }
+  {% endif %}
 }
diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index 8211f55..1dddd97 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -90,6 +90,8 @@ dependencies {
     compile "com.microsoft.azure:azure-cosmosdb:2.6.2"
 
     compile 
"com.sksamuel.elastic4s:elastic4s-http_${gradle.scala.depVersion}:6.7.4"
+    //for mongo
+    compile 
"org.mongodb.scala:mongo-scala-driver_${gradle.scala.depVersion}:2.7.0"
 
     compile 
("com.lightbend.akka:akka-stream-alpakka-s3_${gradle.scala.depVersion}:1.1.2") {
         exclude group: 'org.apache.httpcomponents' //Not used as alpakka uses 
akka-http
diff --git a/common/scala/src/main/resources/application.conf 
b/common/scala/src/main/resources/application.conf
index 287b5c3..7600638 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -381,6 +381,13 @@ whisk {
         #}
     }
 
+    # MongoDB related configuration
+    # For example:
+    # mongodb {
+    #    uri         = mongodb://localhost:27017  # DB Uri
+    #    database    =             # Database name
+    #}
+
     # transaction ID related configuration
     transactions {
         header = "X-Request-ID"
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala 
b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
index 1755abe..00d876e 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
@@ -208,6 +208,7 @@ object ConfigKeys {
 
   val couchdb = "whisk.couchdb"
   val cosmosdb = "whisk.cosmosdb"
+  val mongodb = "whisk.mongodb"
   val kafka = "whisk.kafka"
   val kafkaCommon = s"$kafka.common"
   val kafkaProducer = s"$kafka.producer"
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStore.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStore.scala
new file mode 100644
index 0000000..bab4a1f
--- /dev/null
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStore.scala
@@ -0,0 +1,661 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.mongodb
+
+import akka.actor.ActorSystem
+import akka.event.Logging.ErrorLevel
+import akka.http.scaladsl.model._
+import akka.stream.ActorMaterializer
+import akka.stream.scaladsl._
+import akka.util.ByteString
+import com.mongodb.client.gridfs.model.GridFSUploadOptions
+import org.apache.openwhisk.common.{Logging, LoggingMarkers, TransactionId}
+import org.apache.openwhisk.core.database._
+import org.apache.openwhisk.core.database.StoreUtils._
+import org.apache.openwhisk.core.entity.Attachments.Attached
+import org.apache.openwhisk.core.entity.{DocId, DocInfo, DocRevision, 
DocumentReader, UUID}
+import org.apache.openwhisk.http.Messages
+import org.bson.json.{JsonMode, JsonWriterSettings}
+import org.mongodb.scala.bson.BsonString
+import org.mongodb.scala.bson.collection.immutable.Document
+import org.mongodb.scala.gridfs.{GridFSBucket, GridFSFile, 
MongoGridFSException}
+import org.mongodb.scala.model._
+import org.mongodb.scala.{MongoClient, MongoCollection, MongoException}
+import spray.json._
+
+import scala.concurrent.Future
+import scala.util.Try
+
+object MongoDBArtifactStore {
+  val _computed = "_computed"
+}
+
+/**
+ * Basic client to put and delete artifacts in a data store.
+ *
+ * @param client the mongodb client to access database
+ * @param dbName the name of the database to operate on
+ * @param collName the name of the collection to operate on
+ * @param documentHandler helper class help to simulate the designDoc of 
CouchDB
+ * @param viewMapper helper class help to simulate the designDoc of CouchDB
+ */
+class MongoDBArtifactStore[DocumentAbstraction <: DocumentSerializer](client: 
MongoClient,
+                                                                      dbName: 
String,
+                                                                      
collName: String,
+                                                                      
documentHandler: DocumentHandler,
+                                                                      
viewMapper: MongoDBViewMapper,
+                                                                      val 
inliningConfig: InliningConfig,
+                                                                      val 
attachmentStore: Option[AttachmentStore])(
+  implicit system: ActorSystem,
+  val logging: Logging,
+  jsonFormat: RootJsonFormat[DocumentAbstraction],
+  val materializer: ActorMaterializer,
+  docReader: DocumentReader)
+    extends ArtifactStore[DocumentAbstraction]
+    with DocumentProvider
+    with DefaultJsonProtocol
+    with AttachmentSupport[DocumentAbstraction] {
+
+  import MongoDBArtifactStore._
+
+  protected[core] implicit val executionContext = system.dispatcher
+
+  private val mongodbScheme = "mongodb"
+  val attachmentScheme: String = 
attachmentStore.map(_.scheme).getOrElse(mongodbScheme)
+
+  private val database = client.getDatabase(dbName)
+  private val collection = getCollectionAndCreateIndexes()
+  private val gridFSBucket = GridFSBucket(database, collName)
+
+  private val jsonWriteSettings = 
JsonWriterSettings.builder().outputMode(JsonMode.RELAXED).build
+
+  // MongoDB doesn't support using `$` as the first char of field name, so 
below two fields needs to be encoded first
+  private val fieldsNeedEncode = Seq("annotations", "parameters")
+
+  override protected[database] def put(d: DocumentAbstraction)(implicit 
transid: TransactionId): Future[DocInfo] = {
+    val asJson = d.toDocumentRecord
+
+    val id: String = asJson.fields.getOrElse("_id", 
JsString.empty).convertTo[String].trim
+    require(!id.isEmpty, "document id must be defined")
+
+    val (old_rev, rev) = revisionCalculate(asJson)
+    val docinfoStr = s"id: $id, rev: $rev"
+    val start =
+      transid.started(this, LoggingMarkers.DATABASE_SAVE, s"[PUT] '$collName' 
saving document: '$docinfoStr'")
+
+    val encodedData = encodeFields(fieldsNeedEncode, asJson)
+
+    val data = JsObject(
+      encodedData.fields + (_computed -> 
documentHandler.computedFields(asJson)) + ("_rev" -> rev.toJson))
+
+    val filters =
+      if (rev.startsWith("1-")) {
+        // for new document, we should get no matched document and insert new 
one
+        // if there is a matched document, that one with no _rev field will be 
replaced
+        // if there is a document with the same id but has an _rev field, will 
return en E11000(conflict) error
+        Filters.and(Filters.eq("_id", id), Filters.not(Filters.exists("_rev")))
+      } else {
+        // for old document, we should find a matched document and replace it
+        // if no matched document find and try to insert new document, mongodb 
will return an E11000 error
+        Filters.and(Filters.eq("_id", id), Filters.eq("_rev", old_rev))
+      }
+
+    val f =
+      collection
+        .findOneAndReplace(
+          filters,
+          Document(data.compactPrint),
+          
FindOneAndReplaceOptions().upsert(true).returnDocument(ReturnDocument.AFTER))
+        .toFuture()
+        .map { doc =>
+          transid.finished(this, start, s"[PUT] '$collName' completed 
document: '$docinfoStr', document: '$doc'")
+          DocInfo(DocId(id), DocRevision(rev))
+        }
+        .recover {
+          //  E11000 means a duplicate key error
+          case t: MongoException if t.getCode == 11000 =>
+            transid.finished(this, start, s"[PUT] '$dbName', document: 
'$docinfoStr'; conflict.")
+            throw DocumentConflictException("conflict on 'put'")
+          case t: MongoException =>
+            transid.failed(
+              this,
+              start,
+              s"[PUT] '$dbName' failed to put document: '$docinfoStr'; return 
error code: '${t.getCode}'",
+              ErrorLevel)
+            throw new Exception("Unexpected mongodb server error: " + 
t.getMessage)
+        }
+
+    reportFailure(
+      f,
+      failure =>
+        transid
+          .failed(this, start, s"[PUT] '$collName' internal error, failure: 
'${failure.getMessage}'", ErrorLevel))
+  }
+
+  override protected[database] def del(doc: DocInfo)(implicit transid: 
TransactionId): Future[Boolean] = {
+    require(doc != null && doc.rev.asString != null, "doc revision required 
for delete")
+
+    val start =
+      transid.started(this, LoggingMarkers.DATABASE_DELETE, s"[DEL] 
'$collName' deleting document: '$doc'")
+
+    val f = collection
+      .deleteOne(Filters.and(Filters.eq("_id", doc.id.id), Filters.eq("_rev", 
doc.rev.rev)))
+      .toFuture()
+      .flatMap { result =>
+        if (result.getDeletedCount == 1) { // the result can only be 1 or 0
+          transid.finished(this, start, s"[DEL] '$collName' completed 
document: '$doc'")
+          Future(true)
+        } else {
+          collection.find(Filters.eq("_id", doc.id.id)).toFuture.map { result 
=>
+            if (result.size == 1) {
+              // find the document according to _id, conflict
+              transid.finished(this, start, s"[DEL] '$collName', document: 
'$doc'; conflict.")
+              throw DocumentConflictException("conflict on 'delete'")
+            } else {
+              // doesn't find the document according to _id, not found
+              transid.finished(this, start, s"[DEL] '$collName', document: 
'$doc'; not found.")
+              throw NoDocumentException(s"$doc not found on 'delete'")
+            }
+          }
+        }
+      }
+      .recover {
+        case t: MongoException =>
+          transid.failed(
+            this,
+            start,
+            s"[DEL] '$collName' failed to delete document: '$doc'; error code: 
'${t.getCode}'",
+            ErrorLevel)
+          throw new Exception("Unexpected mongodb server error: " + 
t.getMessage)
+      }
+
+    reportFailure(
+      f,
+      failure =>
+        transid.failed(
+          this,
+          start,
+          s"[DEL] '$collName' internal error, doc: '$doc', failure: 
'${failure.getMessage}'",
+          ErrorLevel))
+  }
+
+  override protected[database] def get[A <: DocumentAbstraction](doc: DocInfo,
+                                                                 
attachmentHandler: Option[(A, Attached) => A] = None)(
+    implicit transid: TransactionId,
+    ma: Manifest[A]): Future[A] = {
+
+    val start = transid.started(this, LoggingMarkers.DATABASE_GET, s"[GET] 
'$dbName' finding document: '$doc'")
+
+    require(doc != null, "doc undefined")
+
+    val f = collection
+      .find(Filters.eq("_id", doc.id.id)) // method deserialize will check 
whether the _rev matched
+      .toFuture()
+      .map(result =>
+        if (result.isEmpty) {
+          transid.finished(this, start, s"[GET] '$collName', document: '$doc'; 
not found.")
+          throw NoDocumentException("not found on 'get'")
+        } else {
+          transid.finished(this, start, s"[GET] '$collName' completed: found 
document '$doc'")
+          val response = 
result.head.toJson(jsonWriteSettings).parseJson.asJsObject
+          val decodeData = decodeFields(fieldsNeedEncode, response)
+
+          val deserializedDoc = deserialize[A, DocumentAbstraction](doc, 
decodeData)
+          attachmentHandler
+            .map(processAttachments(deserializedDoc, decodeData, doc.id.id, _))
+            .getOrElse(deserializedDoc)
+      })
+      .recoverWith {
+        case t: MongoException =>
+          transid.finished(this, start, s"[GET] '$collName' failed to get 
document: '$doc'; error code: '${t.getCode}'")
+          throw new Exception("Unexpected mongodb server error: " + 
t.getMessage)
+        case _: DeserializationException => throw 
DocumentUnreadable(Messages.corruptedEntity)
+      }
+
+    reportFailure(
+      f,
+      failure =>
+        transid.failed(
+          this,
+          start,
+          s"[GET] '$collName' internal error, doc: '$doc', failure: 
'${failure.getMessage}'",
+          ErrorLevel))
+  }
+
+  override protected[database] def get(id: DocId)(implicit transid: 
TransactionId): Future[Option[JsObject]] = {
+    val start = transid.started(this, LoggingMarkers.DATABASE_GET, s"[GET] 
'$collName' finding document: '$id'")
+    val f = collection
+      .find(Filters.equal("_id", id.id))
+      .head()
+      .map {
+        case d: Document =>
+          transid.finished(this, start, s"[GET] '$dbName' completed: found 
document '$id'")
+          Some(decodeFields(fieldsNeedEncode, 
d.toJson(jsonWriteSettings).parseJson.asJsObject))
+        case null =>
+          transid.finished(this, start, s"[GET] '$dbName', document: '$id'; 
not found.")
+          None
+      }
+      .recover {
+        case t: MongoException =>
+          transid.failed(
+            this,
+            start,
+            s"[GET] '$collName' failed to get document: '$id'; error code: 
'${t.getCode}'",
+            ErrorLevel)
+          throw new Exception("Unexpected mongodb server error: " + 
t.getMessage)
+      }
+
+    reportFailure(
+      f,
+      failure =>
+        transid.failed(
+          this,
+          start,
+          s"[GET] '$collName' internal error, doc: '$id', failure: 
'${failure.getMessage}'",
+          ErrorLevel))
+  }
+
+  override protected[core] def query(table: String,
+                                     startKey: List[Any],
+                                     endKey: List[Any],
+                                     skip: Int,
+                                     limit: Int,
+                                     includeDocs: Boolean,
+                                     descending: Boolean,
+                                     reduce: Boolean,
+                                     stale: StaleParameter)(implicit transid: 
TransactionId): Future[List[JsObject]] = {
+    require(!(reduce && includeDocs), "reduce and includeDocs cannot both be 
true")
+    require(!reduce, "Reduce scenario not supported") //TODO Investigate reduce
+    require(skip >= 0, "skip should be non negative")
+    require(limit >= 0, "limit should be non negative")
+
+    val Array(ddoc, viewName) = table.split("/")
+
+    val find = collection
+      .find(viewMapper.filter(ddoc, viewName, startKey, endKey))
+
+    viewMapper.sort(ddoc, viewName, descending).foreach(find.sort)
+
+    find.skip(skip).limit(limit)
+
+    val realIncludeDocs = includeDocs | 
documentHandler.shouldAlwaysIncludeDocs(ddoc, viewName)
+    val start = transid.started(this, LoggingMarkers.DATABASE_QUERY, s"[QUERY] 
'$collName' searching '$table")
+
+    val f = find
+      .toFuture()
+      .map { docs =>
+        transid.finished(this, start, s"[QUERY] '$dbName' completed: matched 
${docs.size}")
+        docs.map { doc =>
+          val js = decodeFields(fieldsNeedEncode, 
doc.toJson(jsonWriteSettings).parseJson.convertTo[JsObject])
+          documentHandler.transformViewResult(
+            ddoc,
+            viewName,
+            startKey,
+            endKey,
+            realIncludeDocs,
+            JsObject(js.fields - _computed),
+            MongoDBArtifactStore.this)
+        }
+      }
+      .flatMap(Future.sequence(_))
+      .map(_.flatten.toList)
+      .recover {
+        case t: MongoException =>
+          transid.failed(this, start, s"[QUERY] '$collName' failed; error 
code: '${t.getCode}'", ErrorLevel)
+          throw new Exception("Unexpected mongodb server error: " + 
t.getMessage)
+      }
+
+    reportFailure(
+      f,
+      failure =>
+        transid
+          .failed(this, start, s"[QUERY] '$collName' internal error, failure: 
'${failure.getMessage}'", ErrorLevel))
+  }
+
+  protected[core] def count(table: String, startKey: List[Any], endKey: 
List[Any], skip: Int, stale: StaleParameter)(
+    implicit transid: TransactionId): Future[Long] = {
+    require(skip >= 0, "skip should be non negative")
+
+    val Array(ddoc, viewName) = table.split("/")
+    val start = transid.started(this, LoggingMarkers.DATABASE_QUERY, s"[COUNT] 
'$dbName' searching '$table")
+
+    val query = viewMapper.filter(ddoc, viewName, startKey, endKey)
+
+    val option = CountOptions().skip(skip)
+    val f =
+      collection
+        .countDocuments(query, option)
+        .toFuture()
+        .map { result =>
+          transid.finished(this, start, s"[COUNT] '$collName' completed: count 
$result")
+          result
+        }
+        .recover {
+          case t: MongoException =>
+            transid.failed(this, start, s"[COUNT] '$collName' failed; error 
code: '${t.getCode}'", ErrorLevel)
+            throw new Exception("Unexpected mongodb server error: " + 
t.getMessage)
+        }
+
+    reportFailure(
+      f,
+      failure =>
+        transid
+          .failed(this, start, s"[COUNT] '$dbName' internal error, failure: 
'${failure.getMessage}'", ErrorLevel))
+  }
+
+  override protected[database] def putAndAttach[A <: DocumentAbstraction](
+    doc: A,
+    update: (A, Attached) => A,
+    contentType: ContentType,
+    docStream: Source[ByteString, _],
+    oldAttachment: Option[Attached])(implicit transid: TransactionId): 
Future[(DocInfo, Attached)] = {
+
+    attachmentStore match {
+      case Some(as) =>
+        attachToExternalStore(doc, update, contentType, docStream, 
oldAttachment, as)
+      case None =>
+        attachToMongo(doc, update, contentType, docStream, oldAttachment)
+    }
+
+  }
+
+  private def attachToMongo[A <: DocumentAbstraction](
+    doc: A,
+    update: (A, Attached) => A,
+    contentType: ContentType,
+    docStream: Source[ByteString, _],
+    oldAttachment: Option[Attached])(implicit transid: TransactionId): 
Future[(DocInfo, Attached)] = {
+
+    for {
+      bytesOrSource <- inlineOrAttach(docStream)
+      uri = uriOf(bytesOrSource, UUID().asString)
+      attached <- {
+        bytesOrSource match {
+          case Left(bytes) =>
+            Future.successful(Attached(uri.toString, contentType, 
Some(bytes.size), Some(digest(bytes))))
+          case Right(source) =>
+            attach(doc, uri.path.toString, contentType, source).map { r =>
+              Attached(uri.toString, contentType, Some(r.length), 
Some(r.digest))
+            }
+        }
+      }
+      docInfo <- put(update(doc, attached))
+
+      //Remove old attachment if it was part of attachmentStore
+      _ <- oldAttachment
+        .map { old =>
+          val oldUri = Uri(old.attachmentName)
+          if (oldUri.scheme == mongodbScheme) {
+            val name = oldUri.path.toString
+            
gridFSBucket.delete(BsonString(s"${docInfo.id.id}/$name")).toFuture.map { _ =>
+              true
+            }
+          } else {
+            Future.successful(true)
+          }
+        }
+        .getOrElse(Future.successful(true))
+    } yield (docInfo, attached)
+  }
+
+  private def attach(d: DocumentAbstraction, name: String, contentType: 
ContentType, docStream: Source[ByteString, _])(
+    implicit transid: TransactionId): Future[AttachResult] = {
+
+    logging.info(this, s"Uploading attach $name")
+    val asJson = d.toDocumentRecord
+    val id: String = asJson.fields("_id").convertTo[String].trim
+    require(!id.isEmpty, "document id must be defined")
+
+    val start = transid.started(
+      this,
+      LoggingMarkers.DATABASE_ATT_SAVE,
+      s"[ATT_PUT] '$collName' uploading attachment '$name' of document 'id: 
$id'")
+
+    val document: org.bson.Document = new org.bson.Document("contentType", 
contentType.toString)
+    //add the document id to the metadata
+    document.append("belongsTo", id)
+
+    val option = new GridFSUploadOptions().metadata(document)
+
+    val uploadStream = gridFSBucket.openUploadStream(BsonString(s"$id/$name"), 
name, option)
+    val sink = MongoDBAsyncStreamSink(uploadStream)
+
+    val f = docStream
+      .runWith(combinedSink(sink))
+      .map { r =>
+        transid
+          .finished(this, start, s"[ATT_PUT] '$collName' completed uploading 
attachment '$name' of document '$id'")
+        AttachResult(r.digest, r.length)
+      }
+      .recover {
+        case t: MongoException =>
+          transid.failed(
+            this,
+            start,
+            s"[ATT_PUT] '$collName' failed to upload attachment '$name' of 
document '$id'; error code '${t.getCode}'",
+            ErrorLevel)
+          throw new Exception("Unexpected mongodb server error: " + 
t.getMessage)
+      }
+
+    reportFailure(
+      f,
+      failure =>
+        transid.failed(
+          this,
+          start,
+          s"[ATT_PUT] '$collName' internal error, name: '$name', doc: '$id', 
failure: '${failure.getMessage}'",
+          ErrorLevel))
+  }
+
+  override protected[core] def readAttachment[T](doc: DocInfo, attached: 
Attached, sink: Sink[ByteString, Future[T]])(
+    implicit transid: TransactionId): Future[T] = {
+
+    val name = attached.attachmentName
+    val attachmentUri = Uri(name)
+
+    attachmentUri.scheme match {
+      case AttachmentSupport.MemScheme =>
+        memorySource(attachmentUri).runWith(sink)
+      case s if s == mongodbScheme || attachmentUri.isRelative =>
+        //relative case is for compatibility with earlier naming approach 
where attachment name would be like 'jarfile'
+        //Compared to current approach of '<scheme>:<name>'
+        readAttachmentFromMongo(doc, attachmentUri, sink)
+      case s if attachmentStore.isDefined && attachmentStore.get.scheme == s =>
+        attachmentStore.get.readAttachment(doc.id, 
attachmentUri.path.toString, sink)
+      case _ =>
+        throw new IllegalArgumentException(s"Unknown attachment scheme in 
attachment uri $attachmentUri")
+    }
+  }
+
+  private def readAttachmentFromMongo[T](doc: DocInfo, attachmentUri: Uri, 
sink: Sink[ByteString, Future[T]])(
+    implicit transid: TransactionId): Future[T] = {
+
+    val attachmentName = attachmentUri.path.toString
+    val start = transid.started(
+      this,
+      LoggingMarkers.DATABASE_ATT_GET,
+      s"[ATT_GET] '$dbName' finding attachment '$attachmentName' of document 
'$doc'")
+
+    require(doc != null, "doc undefined")
+    require(doc.rev.rev != null, "doc revision must be specified")
+
+    val downloadStream = 
gridFSBucket.openDownloadStream(BsonString(s"${doc.id.id}/$attachmentName"))
+
+    def readStream(file: GridFSFile) = {
+      val source = MongoDBAsyncStreamSource(downloadStream)
+      source
+        .runWith(sink)
+        .map { result =>
+          transid
+            .finished(
+              this,
+              start,
+              s"[ATT_GET] '$collName' completed: found attachment 
'$attachmentName' of document '$doc'")
+          result
+        }
+    }
+
+    def getGridFSFile = {
+      downloadStream
+        .gridFSFile()
+        .head()
+        .transform(
+          identity, {
+            case ex: MongoGridFSException if ex.getMessage.contains("File not 
found") =>
+              transid.finished(
+                this,
+                start,
+                s"[ATT_GET] '$collName', retrieving attachment 
'$attachmentName' of document '$doc'; not found.")
+              NoDocumentException("Not found on 'readAttachment'.")
+            case ex: MongoGridFSException =>
+              transid.failed(
+                this,
+                start,
+                s"[ATT_GET] '$collName' failed to get attachment 
'$attachmentName' of document '$doc'; error code: '${ex.getCode}'",
+                ErrorLevel)
+              throw new Exception("Unexpected mongodb server error: " + 
ex.getMessage)
+            case t => t
+          })
+    }
+
+    val f = for {
+      file <- getGridFSFile
+      result <- readStream(file)
+    } yield result
+
+    reportFailure(
+      f,
+      failure =>
+        transid.failed(
+          this,
+          start,
+          s"[ATT_GET] '$dbName' internal error, name: '$attachmentName', doc: 
'$doc', failure: '${failure.getMessage}'",
+          ErrorLevel))
+
+  }
+
+  override protected[core] def deleteAttachments[T](doc: DocInfo)(implicit 
transid: TransactionId): Future[Boolean] =
+    attachmentStore
+      .map(as => as.deleteAttachments(doc.id))
+      .getOrElse(Future.successful(true)) // For MongoDB it is expected that 
the entire document is deleted.
+
+  override def shutdown(): Unit = {
+    // MongoClient maintains the connection pool internally, we don't need to 
manage it
+    attachmentStore.foreach(_.shutdown())
+  }
+
+  private def reportFailure[T, U](f: Future[T], onFailure: Throwable => U): 
Future[T] = {
+    f.failed.foreach {
+      case _: ArtifactStoreException => // These failures are intentional and 
shouldn't trigger the catcher.
+      case x                         => onFailure(x)
+    }
+    f
+  }
+
+  // calculate the revision manually, to be compatible with couchdb's _rev 
field
+  private def revisionCalculate(doc: JsObject): (String, String) = {
+    val md = StoreUtils.emptyDigest()
+    val new_rev =
+      md.digest(doc.toString.getBytes()).map(0xFF & _).map { "%02x".format(_) 
}.foldLeft("") { _ + _ }.take(32)
+    doc.fields
+      .get("_rev")
+      .map { value =>
+        val start = value.convertTo[String].trim.split("-").apply(0).toInt + 1
+        (value.convertTo[String].trim, s"$start-$new_rev")
+      }
+      .getOrElse {
+        ("", s"1-$new_rev")
+      }
+  }
+
+  private def processAttachments[A <: DocumentAbstraction](doc: A,
+                                                           js: JsObject,
+                                                           docId: String,
+                                                           attachmentHandler: 
(A, Attached) => A): A = {
+    js.fields("exec").asJsObject().fields.get("code").map {
+      case code: JsObject =>
+        code.getFields("attachmentName", "attachmentType", "digest", "length") 
match {
+          case Seq(JsString(name), JsString(contentTypeValue), 
JsString(digest), JsNumber(length)) =>
+            val contentType = ContentType.parse(contentTypeValue) match {
+              case Right(ct) => ct
+              case Left(_)   => ContentTypes.NoContentType //Should not happen
+            }
+            attachmentHandler(doc, Attached(getAttachmentName(name), 
contentType, Some(length.longValue), Some(digest)))
+          case x =>
+            throw DeserializationException("Attachment json does not have 
required fields" + x)
+        }
+      case _ => doc
+    } getOrElse {
+      doc // This should not happen as an action always contain field: 
exec.code
+    }
+  }
+
+  /**
+   * Determines if the attachment scheme confirms to new UUID based scheme or 
not
+   * and generates the name based on that
+   */
+  private def getAttachmentName(name: String): String = {
+    Try(java.util.UUID.fromString(name))
+      .map(_ => Uri.from(scheme = attachmentScheme, path = name).toString)
+      .getOrElse(name)
+  }
+
+  private def getCollectionAndCreateIndexes(): MongoCollection[Document] = {
+    val coll = database.getCollection(collName)
+    // create indexes in specific collection if they do not exist
+    coll.listIndexes().toFuture().map { idxes =>
+      val keys = idxes.map {
+        _.get("key").map { fields =>
+          Document(fields.asDocument())
+        } getOrElse {
+          Document.empty // this should not happen
+        }
+      }
+
+      viewMapper.indexes.foreach { idx =>
+        if (!keys.contains(idx))
+          coll.createIndex(idx).toFuture
+      }
+    }
+    coll
+  }
+
+  // encode JsValue which has complex and arbitrary structure to JsString
+  private def encodeFields(fields: Seq[String], jsValue: JsObject): JsObject = 
{
+    var data = jsValue.fields
+    fields.foreach { field =>
+      data.get(field).foreach { value =>
+        data = data.updated(field, JsString(value.compactPrint))
+      }
+    }
+    JsObject(data)
+  }
+
+  // decode fields from JsString
+  private def decodeFields(fields: Seq[String], jsValue: JsObject): JsObject = 
{
+    var data = jsValue.fields
+    fields.foreach { field =>
+      data.get(field).foreach { value =>
+        Try {
+          data = data.updated(field, 
value.asInstanceOf[JsString].value.parseJson)
+        }
+      }
+    }
+    JsObject(data)
+  }
+}
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStoreProvider.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStoreProvider.scala
new file mode 100644
index 0000000..555b274
--- /dev/null
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStoreProvider.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.mongodb
+
+import akka.actor.ActorSystem
+import akka.stream.ActorMaterializer
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.database._
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.entity.{DocumentReader, WhiskActivation, 
WhiskAuth, WhiskEntity}
+import org.mongodb.scala.MongoClient
+import pureconfig._
+import pureconfig.generic.auto._
+import spray.json.RootJsonFormat
+
+import scala.reflect.ClassTag
+
+case class MongoDBConfig(uri: String, database: String) {
+  assume(Set(database, uri).forall(_.trim.nonEmpty), "At least one expected 
property is missing")
+
+  def collectionFor[D](implicit tag: ClassTag[D]) = 
tag.runtimeClass.getSimpleName.toLowerCase
+}
+
+object MongoDBClient {
+  private var _client: Option[MongoClient] = None
+
+  def client(config: MongoDBConfig): MongoClient = {
+    _client.getOrElse {
+      val client = MongoClient(config.uri)
+      _client = Some(client)
+      client
+    }
+  }
+}
+
+object MongoDBArtifactStoreProvider extends ArtifactStoreProvider {
+
+  def makeStore[D <: DocumentSerializer: ClassTag](useBatching: Boolean)(
+    implicit jsonFormat: RootJsonFormat[D],
+    docReader: DocumentReader,
+    actorSystem: ActorSystem,
+    logging: Logging,
+    materializer: ActorMaterializer): ArtifactStore[D] = {
+    val dbConfig = loadConfigOrThrow[MongoDBConfig](ConfigKeys.mongodb)
+    makeArtifactStore(dbConfig, getAttachmentStore())
+  }
+
+  def makeArtifactStore[D <: DocumentSerializer: ClassTag](dbConfig: 
MongoDBConfig,
+                                                           attachmentStore: 
Option[AttachmentStore])(
+    implicit jsonFormat: RootJsonFormat[D],
+    docReader: DocumentReader,
+    actorSystem: ActorSystem,
+    logging: Logging,
+    materializer: ActorMaterializer): ArtifactStore[D] = {
+
+    val inliningConfig = loadConfigOrThrow[InliningConfig](ConfigKeys.db)
+
+    val (handler, mapper) = handlerAndMapper(implicitly[ClassTag[D]])
+
+    new MongoDBArtifactStore[D](
+      MongoDBClient.client(dbConfig),
+      dbConfig.database,
+      dbConfig.collectionFor[D],
+      handler,
+      mapper,
+      inliningConfig,
+      attachmentStore)
+  }
+
+  private def handlerAndMapper[D](entityType: ClassTag[D])(
+    implicit actorSystem: ActorSystem,
+    logging: Logging,
+    materializer: ActorMaterializer): (DocumentHandler, MongoDBViewMapper) = {
+    entityType.runtimeClass match {
+      case x if x == classOf[WhiskEntity] =>
+        (WhisksHandler, WhisksViewMapper)
+      case x if x == classOf[WhiskActivation] =>
+        (ActivationHandler, ActivationViewMapper)
+      case x if x == classOf[WhiskAuth] =>
+        (SubjectHandler, SubjectViewMapper)
+    }
+  }
+}
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBAsyncStreamSink.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBAsyncStreamSink.scala
new file mode 100644
index 0000000..6a71558
--- /dev/null
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBAsyncStreamSink.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.mongodb
+
+import java.nio.ByteBuffer
+
+import akka.Done
+import akka.stream.{Attributes, IOResult, Inlet, SinkShape}
+import akka.stream.scaladsl.Sink
+import akka.stream.stage.{AsyncCallback, GraphStageLogic, 
GraphStageWithMaterializedValue, InHandler}
+import akka.util.ByteString
+import org.mongodb.scala.Completed
+import org.mongodb.scala.gridfs.{AsyncOutputStream}
+
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.util.{Failure, Success, Try}
+
+class MongoDBAsyncStreamSink(stream: AsyncOutputStream)(implicit ec: 
ExecutionContext)
+    extends GraphStageWithMaterializedValue[SinkShape[ByteString], 
Future[IOResult]] {
+  val in: Inlet[ByteString] = Inlet("AsyncStream.in")
+
+  override val shape: SinkShape[ByteString] = SinkShape(in)
+
+  override def createLogicAndMaterializedValue(inheritedAttributes: 
Attributes): (GraphStageLogic, Future[IOResult]) = {
+    val ioResultPromise = Promise[IOResult]()
+    val logic = new GraphStageLogic(shape) with InHandler {
+      handler =>
+      var buffers: Iterator[ByteBuffer] = Iterator()
+      var writeCallback: AsyncCallback[Try[Int]] = _
+      var closeCallback: AsyncCallback[Try[Completed]] = _
+      var position: Int = _
+      var writeDone = Promise[Completed]
+
+      setHandler(in, this)
+
+      override def preStart(): Unit = {
+        //close operation is async and thus requires the stage to remain open
+        //even after all data is read
+        setKeepGoing(true)
+        writeCallback = getAsyncCallback[Try[Int]](handleWriteResult)
+        closeCallback = getAsyncCallback[Try[Completed]](handleClose)
+        pull(in)
+      }
+
+      override def onPush(): Unit = {
+        buffers = grab(in).asByteBuffers.iterator
+        writeDone = Promise[Completed]
+        writeNextBufferOrPull()
+      }
+
+      override def onUpstreamFinish(): Unit = {
+        //Work done perform close
+        //Using async "blessed" callback does not work at this stage so
+        // need to invoke as normal callback
+        //TODO Revisit this
+
+        //write of ByteBuffers from ByteString is an async operation. For last 
push
+        //the write operation may involve multiple async callbacks and by that 
time
+        //onUpstreamFinish may get invoked. So to ensure that close operation 
is performed
+        //"after" the last push writes are done we rely on writeDone promise
+        //and schedule the close on its completion
+        writeDone.future.onComplete(_ => 
stream.close().head().onComplete(handleClose))
+      }
+
+      override def onUpstreamFailure(ex: Throwable): Unit = {
+        fail(ex)
+      }
+
+      private def handleWriteResult(bytesWrittenOrFailure: Try[Int]): Unit = 
bytesWrittenOrFailure match {
+        case Success(bytesWritten) =>
+          position += bytesWritten
+          writeNextBufferOrPull()
+        case Failure(failure) => fail(failure)
+      }
+
+      private def handleClose(completed: Try[Completed]): Unit = completed 
match {
+        case Success(Completed()) =>
+          completeStage()
+          ioResultPromise.trySuccess(IOResult(position, Success(Done)))
+        case Failure(failure) =>
+          fail(failure)
+      }
+
+      private def writeNextBufferOrPull(): Unit = {
+        if (buffers.hasNext) {
+          stream.write(buffers.next()).head().onComplete(writeCallback.invoke)
+        } else {
+          writeDone.trySuccess(Completed())
+          pull(in)
+        }
+      }
+
+      private def fail(failure: Throwable) = {
+        failStage(failure)
+        ioResultPromise.trySuccess(IOResult(position, Failure(failure)))
+      }
+
+    }
+    (logic, ioResultPromise.future)
+  }
+}
+
+object MongoDBAsyncStreamSink {
+  def apply(stream: AsyncOutputStream)(implicit ec: ExecutionContext): 
Sink[ByteString, Future[IOResult]] = {
+    Sink.fromGraph(new MongoDBAsyncStreamSink(stream))
+  }
+}
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBAsyncStreamSource.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBAsyncStreamSource.scala
new file mode 100644
index 0000000..1a7fad9
--- /dev/null
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBAsyncStreamSource.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.mongodb
+
+import java.nio.ByteBuffer
+
+import akka.Done
+import akka.stream.SourceShape
+import akka.stream.Attributes
+import akka.stream.Outlet
+import akka.stream.IOResult
+import akka.stream.scaladsl.Source
+import akka.stream.stage.GraphStageLogic
+import akka.stream.stage.OutHandler
+import akka.stream.stage.GraphStageWithMaterializedValue
+import akka.stream.stage.AsyncCallback
+import akka.util.ByteString
+import org.mongodb.scala.Completed
+import org.mongodb.scala.gridfs.AsyncInputStream
+
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.Promise
+import scala.util.Success
+import scala.util.Try
+import scala.util.Failure
+
+class MongoDBAsyncStreamSource(stream: AsyncInputStream, chunkSize: 
Int)(implicit ec: ExecutionContext)
+    extends GraphStageWithMaterializedValue[SourceShape[ByteString], 
Future[IOResult]] {
+  require(chunkSize > 0, "chunkSize must be greater than 0")
+  val out: Outlet[ByteString] = Outlet("AsyncStream.out")
+
+  override val shape: SourceShape[ByteString] = SourceShape(out)
+
+  override def createLogicAndMaterializedValue(inheritedAttributes: 
Attributes): (GraphStageLogic, Future[IOResult]) = {
+    val ioResultPromise = Promise[IOResult]()
+    val logic = new GraphStageLogic(shape) with OutHandler {
+      handler =>
+      val buffer = ByteBuffer.allocate(chunkSize)
+      var readCallback: AsyncCallback[Try[Int]] = _
+      var closeCallback: AsyncCallback[Try[Completed]] = _
+      var position: Int = _
+
+      setHandler(out, this)
+
+      override def preStart(): Unit = {
+        readCallback = getAsyncCallback[Try[Int]](handleBufferRead)
+        closeCallback = getAsyncCallback[Try[Completed]](handleClose)
+      }
+
+      override def onPull(): Unit = {
+        stream.read(buffer).head().onComplete(readCallback.invoke)
+      }
+
+      private def handleBufferRead(bytesReadOrFailure: Try[Int]): Unit = 
bytesReadOrFailure match {
+        case Success(bytesRead) if bytesRead >= 0 =>
+          buffer.flip
+          push(out, ByteString.fromByteBuffer(buffer))
+          buffer.clear
+          position += bytesRead
+        case Success(_) =>
+          stream.close().head().onComplete(closeCallback.invoke) //Work done 
perform close
+        case Failure(failure) =>
+          fail(failure)
+      }
+
+      private def handleClose(completed: Try[Completed]): Unit = completed 
match {
+        case Success(Completed()) =>
+          completeStage()
+          ioResultPromise.trySuccess(IOResult(position, Success(Done)))
+        case Failure(failure) =>
+          fail(failure)
+      }
+
+      private def fail(failure: Throwable) = {
+        failStage(failure)
+        ioResultPromise.trySuccess(IOResult(position, Failure(failure)))
+      }
+    }
+    (logic, ioResultPromise.future)
+  }
+}
+
+object MongoDBAsyncStreamSource {
+  def apply(stream: AsyncInputStream, chunkSize: Int = 512 * 1024)(
+    implicit ec: ExecutionContext): Source[ByteString, Future[IOResult]] = {
+    Source.fromGraph(new MongoDBAsyncStreamSource(stream, chunkSize))
+  }
+}
diff --git 
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBViewMapper.scala
 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBViewMapper.scala
new file mode 100644
index 0000000..fe792be
--- /dev/null
+++ 
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBViewMapper.scala
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.mongodb
+
+import org.apache.openwhisk.core.database._
+import org.apache.openwhisk.core.entity.WhiskQueries
+import org.mongodb.scala.Document
+import org.mongodb.scala.bson.conversions.Bson
+import org.mongodb.scala.model.Filters._
+import org.mongodb.scala.model.Sorts
+
+trait MongoDBViewMapper {
+  protected val _computed: String = "_computed"
+  protected val TOP: String = WhiskQueries.TOP
+
+  val indexes: List[Document]
+
+  def filter(ddoc: String, view: String, startKey: List[Any], endKey: 
List[Any]): Bson
+
+  def sort(ddoc: String, view: String, descending: Boolean): Option[Bson]
+
+  protected def checkKeys(startKey: List[Any], endKey: List[Any]): Unit = {
+    require(startKey.nonEmpty)
+    require(endKey.nonEmpty)
+    require(startKey.head == endKey.head, s"First key should be same => 
($startKey) - ($endKey)")
+  }
+}
+
+private object ActivationViewMapper extends MongoDBViewMapper {
+  private val NS = "namespace"
+  private val NS_WITH_PATH = s"${_computed}.${ActivationHandler.NS_PATH}"
+  private val START = "start"
+  override val indexes: List[Document] =
+    List(
+      Document(s"$START" -> -1),
+      Document(s"$START" -> -1, s"$NS" -> -1),
+      Document(s"$NS_WITH_PATH" -> -1, s"$START" -> -1))
+
+  override def filter(ddoc: String, view: String, startKey: List[Any], endKey: 
List[Any]): Bson = {
+    checkKeys(startKey, endKey)
+    view match {
+      //whisks-filters ddoc uses namespace + invoking action path as first key
+      case "activations" if ddoc.startsWith("whisks-filters") => 
createActivationFilter(NS_WITH_PATH, startKey, endKey)
+      //whisks ddoc uses namespace as first key
+      case "activations" if ddoc.startsWith("whisks") => 
createActivationFilter(NS, startKey, endKey)
+      case _                                          => throw 
UnsupportedView(s"$ddoc/$view")
+    }
+  }
+
+  override def sort(ddoc: String, view: String, descending: Boolean): 
Option[Bson] = {
+    view match {
+      case "activations" if ddoc.startsWith("whisks-filters") =>
+        val sort = if (descending) Sorts.descending(NS_WITH_PATH, START) else 
Sorts.ascending(NS_WITH_PATH, START)
+        Some(sort)
+      case "activations" if ddoc.startsWith("whisks") =>
+        val sort = if (descending) Sorts.descending(NS, START) else 
Sorts.ascending(NS, START)
+        Some(sort)
+      case _ => throw UnsupportedView(s"$ddoc/$view")
+    }
+  }
+
+  private def createActivationFilter(nsPropName: String, startKey: List[Any], 
endKey: List[Any]) = {
+    require(startKey.head.isInstanceOf[String])
+    val matchNS = equal(nsPropName, startKey.head)
+
+    val filter = (startKey, endKey) match {
+      case (_ :: Nil, _ :: `TOP` :: Nil) =>
+        matchNS
+      case (_ :: since :: Nil, _ :: `TOP` :: `TOP` :: Nil) =>
+        and(matchNS, gte(START, since))
+      case (_ :: since :: Nil, _ :: upto :: `TOP` :: Nil) =>
+        and(matchNS, gte(START, since), lte(START, upto))
+      case _ => throw UnsupportedQueryKeys(s"$startKey, $endKey")
+    }
+    filter
+  }
+}
+
+private object WhisksViewMapper extends MongoDBViewMapper {
+  private val NS = "namespace"
+  private val ROOT_NS = s"${_computed}.${WhisksHandler.ROOT_NS}"
+  private val TYPE = "entityType"
+  private val UPDATED = "updated"
+  private val PUBLISH = "publish"
+  private val BINDING = "binding"
+  override val indexes: List[Document] =
+    List(Document(s"$NS" -> -1, s"$UPDATED" -> -1), Document(s"$ROOT_NS" -> 
-1, s"$UPDATED" -> -1))
+
+  override def filter(ddoc: String, view: String, startKey: List[Any], endKey: 
List[Any]): Bson = {
+    checkKeys(startKey, endKey)
+    view match {
+      case "all" => listAllInNamespace(ddoc, view, startKey, endKey)
+      case _     => listCollectionInNamespace(ddoc, view, startKey, endKey)
+    }
+  }
+
+  private def listCollectionInNamespace(ddoc: String, view: String, startKey: 
List[Any], endKey: List[Any]): Bson = {
+
+    val entityType = getEntityType(ddoc, view)
+
+    val matchType = equal(TYPE, entityType)
+    val matchNS = equal(NS, startKey.head)
+    val matchRootNS = equal(ROOT_NS, startKey.head)
+
+    val filter = (startKey, endKey) match {
+      case (ns :: Nil, _ :: `TOP` :: Nil) =>
+        or(and(matchType, matchNS), and(matchType, matchRootNS))
+      case (ns :: since :: Nil, _ :: `TOP` :: `TOP` :: Nil) =>
+        // @formatter:off
+                or(
+                    and(matchType, matchNS, gte(UPDATED, since)),
+                    and(matchType, matchRootNS, gte(UPDATED, since))
+                )
+            // @formatter:on
+      case (ns :: since :: Nil, _ :: upto :: `TOP` :: Nil) =>
+        or(
+          and(matchType, matchNS, gte(UPDATED, since), lte(UPDATED, upto)),
+          and(matchType, matchRootNS, gte(UPDATED, since), lte(UPDATED, upto)))
+      case _ => throw UnsupportedQueryKeys(s"$ddoc/$view -> ($startKey, 
$endKey)")
+    }
+    if (view == "packages-public")
+      and(equal(BINDING, Map.empty), equal(PUBLISH, true), filter)
+    else
+      filter
+  }
+
+  private def listAllInNamespace(ddoc: String, view: String, startKey: 
List[Any], endKey: List[Any]): Bson = {
+    val matchRootNS = equal(ROOT_NS, startKey.head)
+    val filter = (startKey, endKey) match {
+      case (ns :: Nil, _ :: `TOP` :: Nil) =>
+        and(exists(TYPE), matchRootNS)
+      case _ => throw UnsupportedQueryKeys(s"$ddoc/$view -> ($startKey, 
$endKey)")
+    }
+    filter
+  }
+
+  override def sort(ddoc: String, view: String, descending: Boolean): 
Option[Bson] = {
+    view match {
+      case "actions" | "rules" | "triggers" | "packages" | "packages-public" | 
"all"
+          if ddoc.startsWith("whisks") || ddoc.startsWith("all-whisks") =>
+        val sort = if (descending) Sorts.descending(UPDATED) else 
Sorts.ascending(UPDATED)
+        Some(sort)
+      case _ => throw UnsupportedView(s"$ddoc/$view")
+    }
+  }
+
+  private def getEntityType(ddoc: String, view: String): String = view match {
+    case "actions"                      => "action"
+    case "rules"                        => "rule"
+    case "triggers"                     => "trigger"
+    case "packages" | "packages-public" => "package"
+    case _                              => throw 
UnsupportedView(s"$ddoc/$view")
+  }
+}
+private object SubjectViewMapper extends MongoDBViewMapper {
+  private val BLOCKED = "blocked"
+  private val SUBJECT = "subject"
+  private val UUID = "uuid"
+  private val KEY = "key"
+  private val NS_NAME = "namespaces.name"
+  private val NS_UUID = "namespaces.uuid"
+  private val NS_KEY = "namespaces.key"
+  private val CONCURRENT_INVOCATIONS = "concurrentInvocations"
+  private val INVOCATIONS_PERMINUTE = "invocationsPerMinute"
+  override val indexes: List[Document] =
+    List(Document(s"$NS_NAME" -> -1), Document(s"$NS_UUID" -> -1, s"$NS_KEY" 
-> -1))
+
+  override def filter(ddoc: String, view: String, startKey: List[Any], endKey: 
List[Any]): Bson = {
+    require(startKey == endKey, s"startKey: $startKey and endKey: $endKey must 
be same for $ddoc/$view")
+    (ddoc, view) match {
+      case (s, "identities") if s.startsWith("subjects") =>
+        filterForMatchingSubjectOrNamespace(ddoc, view, startKey, endKey)
+      case ("namespaceThrottlings", "blockedNamespaces") =>
+        or(equal(BLOCKED, true), equal(CONCURRENT_INVOCATIONS, 0), 
equal(INVOCATIONS_PERMINUTE, 0))
+      case _ =>
+        throw UnsupportedView(s"$ddoc/$view")
+    }
+  }
+
+  override def sort(ddoc: String, view: String, descending: Boolean): 
Option[Bson] = {
+    (ddoc, view) match {
+      case (s, "identities") if s.startsWith("subjects") => None
+      case ("namespaceThrottlings", "blockedNamespaces") => None
+      case _ =>
+        throw UnsupportedView(s"$ddoc/$view")
+    }
+  }
+
+  private def filterForMatchingSubjectOrNamespace(ddoc: String,
+                                                  view: String,
+                                                  startKey: List[Any],
+                                                  endKey: List[Any]): Bson = {
+    val notBlocked = notEqual(BLOCKED, true)
+    startKey match {
+      case (ns: String) :: Nil                    => and(notBlocked, 
or(equal(SUBJECT, ns), equal(NS_NAME, ns)))
+      case (uuid: String) :: (key: String) :: Nil =>
+        // @formatter:off
+        and(
+          notBlocked,
+          or(
+            and(equal(UUID, uuid), equal(KEY, key)),
+            and(equal(NS_UUID, uuid), equal(NS_KEY, key))
+          ))
+      // @formatter:on
+      case _ => throw UnsupportedQueryKeys(s"$ddoc/$view -> ($startKey, 
$endKey)")
+    }
+  }
+
+}
diff --git a/tests/src/test/resources/application.conf.j2 
b/tests/src/test/resources/application.conf.j2
index 944f2af..067a736 100644
--- a/tests/src/test/resources/application.conf.j2
+++ b/tests/src/test/resources/application.conf.j2
@@ -67,6 +67,11 @@ whisk {
         throughput = 400
     }
 
+    mongodb {
+        uri         = ${?MONGODB_CONNECT_STRING}
+        database    = ${?MONGODB_DATABASE}
+    }
+
     controller {
       protocol = {{ controller.protocol }}
       https {
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStoreTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStoreTests.scala
new file mode 100644
index 0000000..24c62c1
--- /dev/null
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStoreTests.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.mongodb
+
+import org.apache.openwhisk.core.database.test.behavior.ArtifactStoreBehavior
+import org.junit.runner.RunWith
+import org.scalatest.FlatSpec
+import org.scalatest.junit.JUnitRunner
+
+@RunWith(classOf[JUnitRunner])
+class MongoDBArtifactStoreTests extends FlatSpec with MongoDBStoreBehaviorBase 
with ArtifactStoreBehavior {}
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBAsyncStreamGraphTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBAsyncStreamGraphTests.scala
new file mode 100644
index 0000000..90770f3
--- /dev/null
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBAsyncStreamGraphTests.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.mongodb
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, IOException, 
InputStream}
+
+import akka.stream.ActorMaterializer
+import akka.stream.scaladsl.{Keep, Sink, Source, StreamConverters}
+import akka.stream.testkit.TestSubscriber
+import akka.util.ByteString
+import common.WskActorSystem
+import org.apache.commons.io.IOUtils
+import org.junit.runner.RunWith
+import org.mockito.ArgumentMatchers._
+import org.mockito.Mockito._
+import org.mongodb.scala.gridfs.helpers.AsyncStreamHelper
+import org.scalatest.{FlatSpec, Matchers}
+import org.scalatest.concurrent.{IntegrationPatience, ScalaFutures}
+import org.scalatest.junit.JUnitRunner
+import org.scalatestplus.mockito.MockitoSugar
+
+import scala.util.Random
+
+@RunWith(classOf[JUnitRunner])
+class MongoDBAsyncStreamGraphTests
+    extends FlatSpec
+    with Matchers
+    with ScalaFutures
+    with WskActorSystem
+    with MockitoSugar
+    with IntegrationPatience {
+
+  implicit val mat = ActorMaterializer()
+
+  behavior of "MongoDBAsyncStreamSource"
+
+  it should "read all bytes" in {
+    val bytes = randomBytes(4000)
+    val asyncStream = AsyncStreamHelper.toAsyncInputStream(bytes)
+
+    val readStream = MongoDBAsyncStreamSource(asyncStream, 
42).runWith(StreamConverters.asInputStream())
+    val readBytes = IOUtils.toByteArray(readStream)
+
+    bytes shouldBe readBytes
+  }
+
+  it should "close the stream when done" in {
+    val bytes = randomBytes(4000)
+    val inputStream = new ByteArrayInputStream(bytes)
+    val spiedStream = spy(inputStream)
+    val asyncStream = AsyncStreamHelper.toAsyncInputStream(spiedStream)
+
+    val readStream = MongoDBAsyncStreamSource(asyncStream, 
42).runWith(StreamConverters.asInputStream())
+    val readBytes = IOUtils.toByteArray(readStream)
+
+    bytes shouldBe readBytes
+    verify(spiedStream).close()
+  }
+
+  it should "onError with failure and return a failed IOResult when reading 
from failed stream" in {
+    val inputStream = mock[InputStream]
+
+    val exception = new IOException("Boom")
+    doThrow(exception).when(inputStream).read(any())
+    val asyncStream = AsyncStreamHelper.toAsyncInputStream(inputStream)
+
+    val (ioResult, p) = 
MongoDBAsyncStreamSource(asyncStream).toMat(Sink.asPublisher(false))(Keep.both).run()
+    val c = TestSubscriber.manualProbe[ByteString]()
+    p.subscribe(c)
+
+    val sub = c.expectSubscription()
+    sub.request(1)
+
+    val error = c.expectError()
+    error.getCause should be theSameInstanceAs exception
+
+    ioResult.futureValue.status.isFailure shouldBe true
+  }
+
+  behavior of "MongoDBAsyncStreamSink"
+
+  it should "write all bytes" in {
+    val bytes = randomBytes(4000)
+    val source = StreamConverters.fromInputStream(() => new 
ByteArrayInputStream(bytes), 42)
+
+    val os = new ByteArrayOutputStream()
+    val asyncStream = AsyncStreamHelper.toAsyncOutputStream(os)
+
+    val sink = MongoDBAsyncStreamSink(asyncStream)
+    val ioResult = source.toMat(sink)(Keep.right).run()
+
+    ioResult.futureValue.count shouldBe bytes.length
+
+    val writtenBytes = os.toByteArray
+    writtenBytes shouldBe bytes
+  }
+
+  it should "close the stream when done" in {
+    val bytes = randomBytes(4000)
+    val source = StreamConverters.fromInputStream(() => new 
ByteArrayInputStream(bytes), 42)
+
+    val outputStream = new CloseRecordingStream()
+    val asyncStream = AsyncStreamHelper.toAsyncOutputStream(outputStream)
+
+    val sink = MongoDBAsyncStreamSink(asyncStream)
+    val ioResult = source.toMat(sink)(Keep.right).run()
+
+    ioResult.futureValue.count shouldBe 4000
+    outputStream.toByteArray shouldBe bytes
+    outputStream.closed shouldBe true
+  }
+
+  it should "onError with failure and return a failed IOResult when writing to 
failed stream" in {
+    val os = new ByteArrayOutputStream()
+    val asyncStream = AsyncStreamHelper.toAsyncOutputStream(os)
+
+    val sink = MongoDBAsyncStreamSink(asyncStream)
+    val ioResult = Source(1 to 10)
+      .map { n ⇒
+        if (n == 7) throw new Error("bees!")
+        n
+      }
+      .map(ByteString(_))
+      .runWith(sink)
+    ioResult.futureValue.status.isFailure shouldBe true
+  }
+
+  private def randomBytes(size: Int): Array[Byte] = {
+    val arr = new Array[Byte](size)
+    Random.nextBytes(arr)
+    arr
+  }
+
+  private class CloseRecordingStream extends ByteArrayOutputStream {
+    var closed: Boolean = _
+    override def close() = { super.close(); closed = true }
+  }
+}
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBAttachmentStoreTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBAttachmentStoreTests.scala
new file mode 100644
index 0000000..31eb593
--- /dev/null
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBAttachmentStoreTests.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.mongodb
+
+import org.apache.openwhisk.core.database.DocumentSerializer
+import org.apache.openwhisk.core.database.memory.MemoryAttachmentStoreProvider
+import 
org.apache.openwhisk.core.database.test.behavior.ArtifactStoreAttachmentBehaviors
+import org.junit.runner.RunWith
+import org.scalatest.FlatSpec
+import org.scalatest.junit.JUnitRunner
+
+import scala.reflect.ClassTag
+
+@RunWith(classOf[JUnitRunner])
+class MongoDBAttachmentStoreTests extends FlatSpec with 
MongoDBStoreBehaviorBase with ArtifactStoreAttachmentBehaviors {
+  override protected def getAttachmentStore[D <: DocumentSerializer: 
ClassTag]() =
+    Some(MemoryAttachmentStoreProvider.makeStore[D]())
+}
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBStoreBehaviorBase.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBStoreBehaviorBase.scala
new file mode 100644
index 0000000..d3dae92
--- /dev/null
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBStoreBehaviorBase.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.mongodb
+
+import org.apache.openwhisk.core.ConfigKeys
+import 
org.apache.openwhisk.core.database.test.behavior.ArtifactStoreBehaviorBase
+import org.apache.openwhisk.core.database.{ArtifactStore, AttachmentStore, 
DocumentSerializer}
+import org.apache.openwhisk.core.entity._
+import org.scalatest.FlatSpec
+import pureconfig.loadConfigOrThrow
+import pureconfig.generic.auto._
+
+import scala.reflect.{classTag, ClassTag}
+import scala.util.Try
+
+trait MongoDBStoreBehaviorBase extends FlatSpec with ArtifactStoreBehaviorBase 
{
+  override def storeType = "MongoDB"
+
+  override lazy val storeAvailableCheck: Try[Any] = storeConfigTry
+
+  val storeConfigTry = Try { 
loadConfigOrThrow[MongoDBConfig](ConfigKeys.mongodb) }
+
+  override lazy val authStore = {
+    implicit val docReader: DocumentReader = WhiskDocumentReader
+    
MongoDBArtifactStoreProvider.makeArtifactStore[WhiskAuth](storeConfigTry.get, 
getAttachmentStore[WhiskAuth]())
+  }
+
+  override lazy val entityStore =
+    
MongoDBArtifactStoreProvider.makeArtifactStore[WhiskEntity](storeConfigTry.get, 
getAttachmentStore[WhiskEntity]())(
+      classTag[WhiskEntity],
+      WhiskEntityJsonFormat,
+      WhiskDocumentReader,
+      actorSystem,
+      logging,
+      materializer)
+
+  override lazy val activationStore = {
+    implicit val docReader: DocumentReader = WhiskDocumentReader
+    MongoDBArtifactStoreProvider
+      .makeArtifactStore[WhiskActivation](storeConfigTry.get, 
getAttachmentStore[WhiskActivation]())
+  }
+
+  override protected def getAttachmentStore(store: ArtifactStore[_]) =
+    store.asInstanceOf[MongoDBArtifactStore[_]].attachmentStore
+
+  protected def getAttachmentStore[D <: DocumentSerializer: ClassTag](): 
Option[AttachmentStore] = None
+}
diff --git 
a/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBViewMapperTests.scala
 
b/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBViewMapperTests.scala
new file mode 100644
index 0000000..44b121b
--- /dev/null
+++ 
b/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBViewMapperTests.scala
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.mongodb
+
+import org.apache.openwhisk.core.database.{UnsupportedQueryKeys, 
UnsupportedView}
+import org.apache.openwhisk.core.entity.WhiskQueries.TOP
+import org.bson.conversions.Bson
+import org.junit.runner.RunWith
+import org.mongodb.scala.MongoClient
+import org.mongodb.scala.bson.BsonDocument
+import org.mongodb.scala.bson.collection.immutable.Document
+import org.mongodb.scala.model.Filters.{equal => meq, _}
+import org.mongodb.scala.model.Sorts
+import org.scalatest.{FlatSpec, Matchers, OptionValues}
+import org.scalatest.junit.JUnitRunner
+
+@RunWith(classOf[JUnitRunner])
+class MongoDBViewMapperTests extends FlatSpec with Matchers with OptionValues {
+  implicit class RichBson(val b: Bson) {
+    def toDoc: BsonDocument = b.toBsonDocument(classOf[Document], 
MongoClient.DEFAULT_CODEC_REGISTRY)
+  }
+
+  behavior of "ActivationViewMapper filter"
+
+  it should "match all activations in namespace" in {
+    ActivationViewMapper.filter("whisks.v2.1.0", "activations", List("ns1"), 
List("ns1", TOP)).toDoc shouldBe
+      meq("namespace", "ns1").toDoc
+    ActivationViewMapper.filter("whisks-filters.v2.1.0", "activations", 
List("ns1"), List("ns1", TOP)).toDoc shouldBe
+      meq("_computed.nspath", "ns1").toDoc
+  }
+
+  it should "match all activations in namespace since zero" in {
+    ActivationViewMapper.filter("whisks.v2.1.0", "activations", List("ns1", 
0), List("ns1", TOP, TOP)).toDoc shouldBe
+      and(meq("namespace", "ns1"), gte("start", 0)).toDoc
+
+    ActivationViewMapper
+      .filter("whisks-filters.v2.1.0", "activations", List("ns1", 0), 
List("ns1", TOP, TOP))
+      .toDoc shouldBe
+      and(meq("_computed.nspath", "ns1"), gte("start", 0)).toDoc
+  }
+
+  it should "match all activations in namespace since some value" in {
+    ActivationViewMapper.filter("whisks.v2.1.0", "activations", List("ns1", 
42), List("ns1", TOP, TOP)).toDoc shouldBe
+      and(meq("namespace", "ns1"), gte("start", 42)).toDoc
+
+    ActivationViewMapper
+      .filter("whisks-filters.v2.1.0", "activations", List("ns1", 42), 
List("ns1", TOP, TOP))
+      .toDoc shouldBe
+      and(meq("_computed.nspath", "ns1"), gte("start", 42)).toDoc
+  }
+
+  it should "match all activations in namespace between 2 instants" in {
+    ActivationViewMapper.filter("whisks.v2.1.0", "activations", List("ns1", 
42), List("ns1", 314, TOP)).toDoc shouldBe
+      and(meq("namespace", "ns1"), gte("start", 42), lte("start", 314)).toDoc
+
+    ActivationViewMapper
+      .filter("whisks-filters.v2.1.0", "activations", List("ns1", 42), 
List("ns1", 314, TOP))
+      .toDoc shouldBe
+      and(meq("_computed.nspath", "ns1"), gte("start", 42), lte("start", 
314)).toDoc
+  }
+
+  it should "throw UnsupportedQueryKeys for unknown keys" in {
+    intercept[UnsupportedQueryKeys] {
+      ActivationViewMapper.filter("whisks.v2.1.0", "activations", List("ns1"), 
List("ns1", "foo"))
+    }
+  }
+
+  it should "throw UnsupportedView exception for unknown views" in {
+    intercept[UnsupportedView] {
+      ActivationViewMapper.filter("whisks.v2.1.0", "activation-foo", 
List("ns1"), List("ns1", TOP))
+    }
+  }
+
+  behavior of "ActivationViewMapper sort"
+
+  it should "sort descending" in {
+    ActivationViewMapper.sort("whisks-filters.v2.1.0", "activations", 
descending = true).value.toDoc shouldBe
+      Sorts.descending("_computed.nspath", "start").toDoc
+    ActivationViewMapper.sort("whisks.v2.1.0", "activations", descending = 
true).value.toDoc shouldBe
+      Sorts.descending("namespace", "start").toDoc
+  }
+
+  it should "sort ascending" in {
+    ActivationViewMapper.sort("whisks-filters.v2.1.0", "activations", 
descending = false).value.toDoc shouldBe
+      Sorts.ascending("_computed.nspath", "start").toDoc
+    ActivationViewMapper.sort("whisks.v2.1.0", "activations", descending = 
false).value.toDoc shouldBe
+      Sorts.ascending("namespace", "start").toDoc
+  }
+
+  it should "throw UnsupportedView" in {
+    intercept[UnsupportedView] {
+      ActivationViewMapper.sort("whisks.v2.1.0", "activation-foo", descending 
= true)
+    }
+  }
+
+  behavior of "WhisksViewMapper filter"
+
+  val whiskTypes = Seq(
+    ("actions", "action"),
+    ("packages", "package"),
+    ("packages-public", "package"),
+    ("rules", "rule"),
+    ("triggers", "trigger"))
+
+  it should "match entities of specific type in namespace" in {
+    whiskTypes.foreach {
+      case (view, entityType) =>
+        var filters =
+          or(
+            and(meq("entityType", entityType), meq("namespace", "ns1")),
+            and(meq("entityType", entityType), meq("_computed.rootns", "ns1")))
+        if (view == "packages-public")
+          filters = getPublicPackageFilter(filters)
+        WhisksViewMapper.filter("whisks.v2.1.0", view, List("ns1"), 
List("ns1", TOP)).toDoc shouldBe filters.toDoc
+    }
+  }
+
+  it should "match entities of specific type in namespace and updated since" 
in {
+    whiskTypes.foreach {
+      case (view, entityType) =>
+        var filters =
+          or(
+            and(meq("entityType", entityType), meq("namespace", "ns1"), 
gte("updated", 42)),
+            and(meq("entityType", entityType), meq("_computed.rootns", "ns1"), 
gte("updated", 42)))
+        if (view == "packages-public")
+          filters = getPublicPackageFilter(filters)
+        WhisksViewMapper
+          .filter("whisks.v2.1.0", view, List("ns1", 42), List("ns1", TOP, 
TOP))
+          .toDoc shouldBe filters.toDoc
+    }
+  }
+
+  it should "match all entities of specific type in namespace and between" in {
+    whiskTypes.foreach {
+      case (view, entityType) =>
+        var filters =
+          or(
+            and(meq("entityType", entityType), meq("namespace", "ns1"), 
gte("updated", 42), lte("updated", 314)),
+            and(meq("entityType", entityType), meq("_computed.rootns", "ns1"), 
gte("updated", 42), lte("updated", 314)))
+        if (view == "packages-public")
+          filters = getPublicPackageFilter(filters)
+        WhisksViewMapper
+          .filter("whisks.v2.1.0", view, List("ns1", 42), List("ns1", 314, 
TOP))
+          .toDoc shouldBe filters.toDoc
+    }
+  }
+
+  it should "match all entities in namespace" in {
+    WhisksViewMapper.filter("whisks.v2.1.0", "all", List("ns1"), List("ns1", 
TOP)).toDoc shouldBe
+      and(exists("entityType"), meq("_computed.rootns", "ns1")).toDoc
+  }
+
+  it should "throw UnsupportedQueryKeys for unknown keys" in {
+    intercept[UnsupportedQueryKeys] {
+      WhisksViewMapper.filter("whisks.v2.1.0", "actions", List("ns1"), 
List("ns1", "foo"))
+    }
+    intercept[UnsupportedQueryKeys] {
+      WhisksViewMapper.filter("whisks.v2.1.0", "all", List("ns1"), List("ns1", 
"foo"))
+    }
+  }
+
+  it should "throw UnsupportedView exception for unknown views" in {
+    intercept[UnsupportedView] {
+      WhisksViewMapper.filter("whisks.v2.1.0", "actions-foo", List("ns1"), 
List("ns1", TOP))
+    }
+  }
+
+  behavior of "WhisksViewMapper sort"
+
+  it should "sort descending" in {
+    whiskTypes.foreach {
+      case (view, _) =>
+        WhisksViewMapper.sort("whisks.v2.1.0", view, descending = 
true).value.toDoc shouldBe
+          Sorts.descending("updated").toDoc
+    }
+  }
+
+  it should "sort ascending" in {
+    whiskTypes.foreach {
+      case (view, _) =>
+        WhisksViewMapper.sort("whisks.v2.1.0", view, descending = 
false).value.toDoc shouldBe
+          Sorts.ascending("updated").toDoc
+    }
+  }
+
+  it should "throw UnsupportedView" in {
+    intercept[UnsupportedView] {
+      WhisksViewMapper.sort("whisks.v2.1.0", "action-foo", descending = true)
+    }
+  }
+
+  behavior of "SubjectViewMapper filter"
+
+  it should "match by subject or namespace" in {
+    SubjectViewMapper.filter("subjects", "identities", List("foo"), 
List("foo")).toDoc shouldBe
+      and(notEqual("blocked", true), or(meq("subject", "foo"), 
meq("namespaces.name", "foo"))).toDoc
+  }
+
+  it should "match by uuid and key" in {
+    SubjectViewMapper.filter("subjects", "identities", List("u1", "k1"), 
List("u1", "k1")).toDoc shouldBe
+      and(
+        notEqual("blocked", true),
+        or(and(meq("uuid", "u1"), meq("key", "k1")), 
and(meq("namespaces.uuid", "u1"), meq("namespaces.key", "k1")))).toDoc
+  }
+
+  it should "match by blocked or invocationsPerMinute or 
concurrentInvocations" in {
+    SubjectViewMapper
+      .filter("namespaceThrottlings", "blockedNamespaces", List("u1", "k1"), 
List("u1", "k1"))
+      .toDoc shouldBe
+      or(meq("blocked", true), meq("concurrentInvocations", 0), 
meq("invocationsPerMinute", 0)).toDoc
+  }
+
+  it should "throw exception when keys are not same" in {
+    intercept[IllegalArgumentException] {
+      SubjectViewMapper.filter("subjects", "identities", List("u1", "k1"), 
List("u1", "k2"))
+    }
+  }
+
+  it should "throw UnsupportedQueryKeys exception when keys are not know" in {
+    intercept[UnsupportedQueryKeys] {
+      SubjectViewMapper.filter("subjects", "identities", List("u1", "k1", 
"foo"), List("u1", "k1", "foo"))
+    }
+  }
+
+  it should "throw UnsupportedView exception when view is not known" in {
+    intercept[UnsupportedView] {
+      SubjectViewMapper.filter("subjects", "identities-foo", List("u1", "k1", 
"foo"), List("u1", "k1", "foo"))
+    }
+  }
+
+  behavior of "SubjectViewMapper sort"
+
+  it should "sort none" in {
+    SubjectViewMapper.sort("subjects", "identities", descending = true) 
shouldBe None
+    SubjectViewMapper.sort("namespaceThrottlings", "blockedNamespaces", 
descending = true) shouldBe None
+  }
+
+  private def getPublicPackageFilter(filters: Bson): Bson = {
+    and(meq("binding", Map.empty), meq("publish", true), filters)
+  }
+}
diff --git a/tools/build/README.md b/tools/build/README.md
index 0282482..ce5590f 100644
--- a/tools/build/README.md
+++ b/tools/build/README.md
@@ -31,6 +31,7 @@ The script is called `redo` because for most development, one 
will want to "redo
 - initialize environment and `docker-machine` (for mac): `redo setup prereq`
 - start CouchDB container and initialize DB with system and guest keys: `redo 
couchdb initdb`
 - start ElasticSearch container to store activations: `redo elasticsearch`
+- start MongoDB container to as database backend: `redo mongodb`
 - build and deploy system: `redo deploy`
 - run tests: `redo props tests`
 
diff --git a/tools/build/redo b/tools/build/redo
index 9ffa04e..b149c1a 100755
--- a/tools/build/redo
+++ b/tools/build/redo
@@ -241,6 +241,13 @@ Components = [
                   'deploy elasticsearch',
                   modes = 'clean'),
 
+    makeComponent('mongodb',
+                  'deploy mongodb',
+                  modes = 'clean'),
+
+    makeComponent('initMongoDB',
+                  'initialize mongodb with guest/system keys'),
+
     makeComponent('build',
                   'build system',
                   yaml = False,

Reply via email to