This is an automated email from the ASF dual-hosted git repository.
style95 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/openwhisk.git
The following commit(s) were added to refs/heads/master by this push:
new 9279626 Implement MongoDBArtifactStore (#4963)
9279626 is described below
commit 927962692e45fc7276ccb7902fd3a111daa54885
Author: jiangpch <[email protected]>
AuthorDate: Wed Jun 9 08:54:26 2021 +0800
Implement MongoDBArtifactStore (#4963)
* Implement MongoDBActivationStore
Co-authored-by: Chetan Mehrotra <[email protected]>
* Upgrade mongo-scala to 2.7.0
* Fix test
* Add license headers
* Add default value for mongodb_connect_string
* Rename graph stage classes used for mongodb gridfs
* Update readme
* Update based on comments
* Fix typo and update README
* Rename db.backend to db.artifact_store.backend
Co-authored-by: Chetan Mehrotra <[email protected]>
---
ansible/README.md | 37 ++
ansible/group_vars/all | 8 +
ansible/{tasks/db/checkDb.yml => initMongoDB.yml} | 33 +-
ansible/library/mongodb.py | 283 +++++++++
ansible/{tasks/db/checkDb.yml => mongodb.yml} | 30 +-
ansible/roles/controller/tasks/deploy.yml | 13 +
ansible/roles/invoker/tasks/deploy.yml | 13 +
.../checkDb.yml => roles/mongodb/tasks/clean.yml} | 18 +-
.../checkDb.yml => roles/mongodb/tasks/deploy.yml} | 32 +-
.../checkDb.yml => roles/mongodb/tasks/main.yml} | 21 +-
ansible/tasks/db/checkDb.yml | 4 +
ansible/templates/whisk.conf.j2 | 10 +
common/scala/build.gradle | 2 +
common/scala/src/main/resources/application.conf | 7 +
.../org/apache/openwhisk/core/WhiskConfig.scala | 1 +
.../database/mongodb/MongoDBArtifactStore.scala | 661 +++++++++++++++++++++
.../mongodb/MongoDBArtifactStoreProvider.scala | 99 +++
.../database/mongodb/MongoDBAsyncStreamSink.scala | 122 ++++
.../mongodb/MongoDBAsyncStreamSource.scala | 104 ++++
.../core/database/mongodb/MongoDBViewMapper.scala | 224 +++++++
tests/src/test/resources/application.conf.j2 | 5 +
.../mongodb/MongoDBArtifactStoreTests.scala | 26 +
.../mongodb/MongoDBAsyncStreamGraphTests.scala | 153 +++++
.../mongodb/MongoDBAttachmentStoreTests.scala | 33 +
.../mongodb/MongoDBStoreBehaviorBase.scala | 62 ++
.../database/mongodb/MongoDBViewMapperTests.scala | 256 ++++++++
tools/build/README.md | 1 +
tools/build/redo | 7 +
28 files changed, 2205 insertions(+), 60 deletions(-)
diff --git a/ansible/README.md b/ansible/README.md
index d08b688..7cc05f8 100644
--- a/ansible/README.md
+++ b/ansible/README.md
@@ -196,6 +196,43 @@ ansible-playbook -i environments/$ENVIRONMENT routemgmt.yml
- To use the API Gateway, you'll need to run `apigateway.yml` and
`routemgmt.yml`.
- Use `ansible-playbook -i environments/$ENVIRONMENT openwhisk.yml` to avoid
wiping the data store. This is useful to start OpenWhisk after restarting your
Operating System.
+### Deploying Using MongoDB
+
+You can choose MongoDB instead of CouchDB as the database backend to store
entities.
+
+- Deploy a mongodb server(Optional, for test and develop only, use an external
MongoDB server in production).
+ You need to execute `pip install pymongo` first
+
+```
+ansible-playbook -i environments/<environment> mongodb.yml -e
mongodb_data_volume="/tmp/mongo-data"
+```
+
+- Then execute
+
+```
+cd <openwhisk_home>
+./gradlew distDocker
+cd ansible
+ansible-playbook -i environments/<environment> initMongodb.yml -e
mongodb_connect_string="mongodb://172.17.0.1:27017"
+ansible-playbook -i environments/<environment> apigateway.yml -e
mongodb_connect_string="mongodb://172.17.0.1:27017"
+ansible-playbook -i environments/<environment> openwhisk.yml -e
mongodb_connect_string="mongodb://172.17.0.1:27017" -e
db_artifact_backend="MongoDB"
+
+# installs a catalog of public packages and actions
+ansible-playbook -i environments/<environment> postdeploy.yml
+
+# to use the API gateway
+ansible-playbook -i environments/<environment> apigateway.yml
+ansible-playbook -i environments/<environment> routemgmt.yml
+```
+
+Available parameters for ansible are
+```
+ mongodb:
+ connect_string: "{{ mongodb_connect_string }}"
+ database: "{{ mongodb_database | default('whisks') }}"
+ data_volume: "{{ mongodb_data_volume | default('mongo-data') }}"
+```
+
### Using ElasticSearch to Store Activations
You can use ElasticSearch (ES) to store activations separately while other
entities remain stored in CouchDB. There is an Ansible playbook to setup a
simple ES cluster for testing and development purposes.
diff --git a/ansible/group_vars/all b/ansible/group_vars/all
index 1467639..79112a9 100644
--- a/ansible/group_vars/all
+++ b/ansible/group_vars/all
@@ -278,6 +278,8 @@ db:
invoker:
user: "{{ db_invoker_user | default(lookup('ini', 'db_username
section=invoker file={{ playbook_dir }}/db_local.ini')) }}"
pass: "{{ db_invoker_pass | default(lookup('ini', 'db_password
section=invoker file={{ playbook_dir }}/db_local.ini')) }}"
+ artifact_store:
+ backend: "{{ db_artifact_backend | default('CouchDB') }}"
activation_store:
backend: "{{ db_activation_backend | default('CouchDB') }}"
elasticsearch:
@@ -299,6 +301,10 @@ db:
admin:
username: "{{ elastic_username | default('admin') }}"
password: "{{ elastic_password | default('admin') }}"
+ mongodb:
+ connect_string: "{{ mongodb_connect_string |
default('mongodb://172.17.0.1:27017') }}"
+ database: "{{ mongodb_database | default('whisks') }}"
+ data_volume: "{{ mongodb_data_volume | default('mongo-data') }}"
apigateway:
port:
@@ -326,6 +332,8 @@ elasticsearch_connect_string: "{% set ret = [] %}\
{{ ret.append( hostvars[host].ansible_host +
':' + ((db.elasticsearch.port+loop.index-1)|string) ) }}\
{% endfor %}\
{{ ret | join(',') }}"
+mongodb:
+ version: 4.4.0
docker:
# The user to install docker for. Defaults to the ansible user if not set.
This will be the user who is able to run
diff --git a/ansible/tasks/db/checkDb.yml b/ansible/initMongoDB.yml
similarity index 53%
copy from ansible/tasks/db/checkDb.yml
copy to ansible/initMongoDB.yml
index 5962400..58672a6 100644
--- a/ansible/tasks/db/checkDb.yml
+++ b/ansible/initMongoDB.yml
@@ -15,16 +15,25 @@
# limitations under the License.
#
---
-# Checks, that the Database exists
-# dbName - name of the database to check
-# dbUser - name of the user which should have access rights
-# dbPass - password of the user which should have access
+# This playbook will initialize the immortal DBs in the database account.
+# This step is usually done only once per deployment.
-- name: check if {{ dbName }} with {{ db.provider }} exists
- uri:
- url: "{{ db.protocol }}://{{ db.host }}:{{ db.port }}/{{ dbName }}"
- method: HEAD
- status_code: 200
- user: "{{ dbUser }}"
- password: "{{ dbPass }}"
- force_basic_auth: yes
+- hosts: ansible
+ tasks:
+ - name: create necessary auth keys
+ mongodb:
+ connect_string: "{{ db.mongodb.connect_string }}"
+ database: "{{ db.mongodb.database }}"
+ collection: "whiskauth"
+ doc:
+ _id: "{{ item }}"
+ subject: "{{ item }}"
+ namespaces:
+ - name: "{{ item }}"
+ uuid: "{{ key.split(':')[0] }}"
+ key: "{{ key.split(':')[1] }}"
+ mode: "doc"
+ force_update: True
+ vars:
+ key: "{{ lookup('file', 'files/auth.{{ item }}') }}"
+ with_items: "{{ db.authkeys }}"
diff --git a/ansible/library/mongodb.py b/ansible/library/mongodb.py
new file mode 100644
index 0000000..fb610fa
--- /dev/null
+++ b/ansible/library/mongodb.py
@@ -0,0 +1,283 @@
+#!/usr/bin/python
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from __future__ import absolute_import, division, print_function
+__metaclass__ = type
+
+
+DOCUMENTATION = '''
+---
+module: mongodb
+short_description: A module which support some simple operations on MongoDB.
+description:
+ - Including add user/insert document/create indexes in MongoDB
+options:
+ connect_string:
+ description:
+ - The uri of mongodb server
+ required: true
+ database:
+ description:
+ - The name of the database you want to manipulate
+ required: true
+ user:
+ description:
+ - The name of the user to add or remove, required when use 'user'
mode
+ required: false
+ default: null
+ password:
+ description:
+ - The password to use for the user, required when use 'user' mode
+ required: false
+ default: null
+ roles:
+ description:
+ - The roles of the user, it's a list of dict, each dict requires
two fields: 'db' and 'role', required when use 'user' mode
+ required: false
+ default: null
+ collection:
+ required: false
+ description:
+ - The name of the collection you want to manipulate, required when
use 'doc' or 'indexes' mode
+ doc:
+ required: false
+ description:
+ - The document you want to insert into MongoDB, required when use
'doc' mode
+ indexes:
+ required: false
+ description:
+ - The indexes you want to create in MongoDB, it's a list of dict,
you can see the example for the usage, required when use 'index' mode
+ force_update:
+ required: false
+ description:
+ - Whether replace/update existing user or doc or raise
DuplicateKeyError, default is false
+ mode:
+ required: false
+ default: user
+ choices: ['user', 'doc', 'index']
+ description:
+ - use 'user' mode if you want to add user, 'doc' mode to insert
document, 'index' mode to create indexes
+
+requirements: [ "pymongo" ]
+author:
+ - "Jinag PengCheng"
+'''
+
+EXAMPLES = '''
+# add user
+- mongodb:
+ connect_string: mongodb://localhost:27017
+ database: admin
+ user: test
+ password: 123456
+ roles:
+ - db: test_database
+ role: read
+ force_update: true
+
+# add doc
+- mongodb:
+ connect_string: mongodb://localhost:27017
+ mode: doc
+ database: admin
+ collection: main
+ doc:
+ id: "id/document"
+ title: "the name of document"
+ content: "which doesn't matter"
+ force_update: true
+
+# add indexes
+- mongodb:
+ connect_string: mongodb://localhost:27017
+ mode: index
+ database: admin
+ collection: main
+ indexes:
+ - index:
+ - field: updated_at
+ direction: 1
+ - field: name
+ direction: -1
+ name: test-index
+ unique: true
+'''
+
+import traceback
+
+from ansible.module_utils.basic import AnsibleModule
+from ansible.module_utils._text import to_native
+
+try:
+ from pymongo import ASCENDING, DESCENDING, GEO2D, GEOHAYSTACK, GEOSPHERE,
HASHED, TEXT
+ from pymongo import IndexModel
+ from pymongo import MongoClient
+ from pymongo.errors import DuplicateKeyError
+except ImportError:
+ pass
+
+
+# =========================================
+# MongoDB module specific support methods.
+#
+
+class UnknownIndexPlugin(Exception):
+ pass
+
+
+def check_params(params, mode, module):
+ missed_params = []
+ for key in OPERATIONS[mode]['required']:
+ if params[key] is None:
+ missed_params.append(key)
+
+ if missed_params:
+ module.fail_json(msg="missing required arguments: %s" %
(",".join(missed_params)))
+
+
+def _recreate_user(module, db, user, password, roles):
+ try:
+ db.command("dropUser", user)
+ db.command("createUser", user, pwd=password, roles=roles)
+ except Exception as e:
+ module.fail_json(msg='Unable to create user: %s' % to_native(e),
exception=traceback.format_exc())
+
+
+
+def user(module, client, db_name, **kwargs):
+ roles = kwargs['roles']
+ if roles is None:
+ roles = []
+ db = client[db_name]
+
+ try:
+ db.command("createUser", kwargs['user'], pwd=kwargs['password'],
roles=roles)
+ except DuplicateKeyError as e:
+ if kwargs['force_update']:
+ _recreate_user(module, db, kwargs['user'], kwargs['password'],
roles)
+ else:
+ module.fail_json(msg='Unable to create user: %s' % to_native(e),
exception=traceback.format_exc())
+ except Exception as e:
+ module.fail_json(msg='Unable to create user: %s' % to_native(e),
exception=traceback.format_exc())
+
+ module.exit_json(changed=True, user=kwargs['user'])
+
+
+def doc(module, client, db_name, **kwargs):
+ coll = client[db_name][kwargs['collection']]
+ try:
+ coll.insert_one(kwargs['doc'])
+ except DuplicateKeyError as e:
+ if kwargs['force_update']:
+ try:
+ coll.replace_one({'_id': kwargs['doc']['_id']}, kwargs['doc'])
+ except Exception as e:
+ module.fail_json(msg='Unable to insert doc: %s' %
to_native(e), exception=traceback.format_exc())
+ else:
+ module.fail_json(msg='Unable to insert doc: %s' % to_native(e),
exception=traceback.format_exc())
+ except Exception as e:
+ module.fail_json(msg='Unable to insert doc: %s' % to_native(e),
exception=traceback.format_exc())
+
+ kwargs['doc']['_id'] = str(kwargs['doc']['_id'])
+ module.exit_json(changed=True, doc=kwargs['doc'])
+
+
+def _clean_index_direction(direction):
+ if direction in ["1", "-1"]:
+ direction = int(direction)
+
+ if direction not in [ASCENDING, DESCENDING, GEO2D, GEOHAYSTACK, GEOSPHERE,
HASHED, TEXT]:
+ raise UnknownIndexPlugin("Unable to create indexes: Unknown index
plugin: %s" % direction)
+ return direction
+
+
+def _clean_index_options(options):
+ res = {}
+ supported_options = set(['name', 'unique', 'background', 'sparse',
'bucketSize', 'min', 'max', 'expireAfterSeconds'])
+ for key in set(options.keys()).intersection(supported_options):
+ res[key] = options[key]
+ if key in ['min', 'max', 'bucketSize', 'expireAfterSeconds']:
+ res[key] = int(res[key])
+
+ return res
+
+
+def parse_indexes(idx):
+ keys = [(k['field'], _clean_index_direction(k['direction'])) for k in
idx.pop('index')]
+ options = _clean_index_options(idx)
+ return IndexModel(keys, **options)
+
+
+def index(module, client, db_name, **kwargs):
+ parsed_indexes = map(parse_indexes, kwargs['indexes'])
+ try:
+ coll = client[db_name][kwargs['collection']]
+ coll.create_indexes(parsed_indexes)
+ except Exception as e:
+ module.fail_json(msg='Unable to create indexes: %s' % to_native(e),
exception=traceback.format_exc())
+
+ module.exit_json(changed=True, indexes=kwargs['indexes'])
+
+
+OPERATIONS = {
+ 'user': { 'function': user, 'params': ['user', 'password', 'roles',
'force_update'], 'required': ['user', 'password']},
+ 'doc': {'function': doc, 'params': ['doc', 'collection', 'force_update'],
'required': ['doc', 'collection']},
+ 'index': {'function': index, 'params': ['indexes', 'collection'],
'required': ['indexes', 'collection']}
+}
+
+
+# =========================================
+# Module execution.
+#
+
+def main():
+ module = AnsibleModule(
+ argument_spec=dict(
+ connect_string=dict(required=True),
+ database=dict(required=True, aliases=['db']),
+ mode=dict(default='user', choices=['user', 'doc', 'index']),
+ user=dict(default=None),
+ password=dict(default=None, no_log=True),
+ roles=dict(default=None, type='list'),
+ collection=dict(default=None),
+ doc=dict(default=None, type='dict'),
+ force_update=dict(default=False, type='bool'),
+ indexes=dict(default=None, type='list'),
+ )
+ )
+
+ mode = module.params['mode']
+
+ db_name = module.params['database']
+
+ params = {key: module.params[key] for key in OPERATIONS[mode]['params']}
+ check_params(params, mode, module)
+
+ try:
+ client = MongoClient(module.params['connect_string'])
+ except NameError:
+ module.fail_json(msg='the python pymongo module is required')
+ except Exception as e:
+ module.fail_json(msg='unable to connect to database: %s' %
to_native(e), exception=traceback.format_exc())
+
+ OPERATIONS[mode]['function'](module, client, db_name, **params)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/ansible/tasks/db/checkDb.yml b/ansible/mongodb.yml
similarity index 60%
copy from ansible/tasks/db/checkDb.yml
copy to ansible/mongodb.yml
index 5962400..2a0b4f6 100644
--- a/ansible/tasks/db/checkDb.yml
+++ b/ansible/mongodb.yml
@@ -15,16 +15,22 @@
# limitations under the License.
#
---
-# Checks, that the Database exists
-# dbName - name of the database to check
-# dbUser - name of the user which should have access rights
-# dbPass - password of the user which should have access
+# This playbook deploys a MongoDB for Openwhisk.
-- name: check if {{ dbName }} with {{ db.provider }} exists
- uri:
- url: "{{ db.protocol }}://{{ db.host }}:{{ db.port }}/{{ dbName }}"
- method: HEAD
- status_code: 200
- user: "{{ dbUser }}"
- password: "{{ dbPass }}"
- force_basic_auth: yes
+- hosts: localhost
+ tasks:
+ - name: check if db_local.ini exists?
+ tags: ini
+ stat: path="{{ playbook_dir }}/db_local.ini"
+ register: db_check
+
+ - name: prepare db_local.ini
+ tags: ini
+ local_action: template src="db_local.ini.j2" dest="{{ playbook_dir
}}/db_local.ini"
+ when: not db_check.stat.exists
+
+# This is for test, only deploy it on the first node, please use a shard
cluster mongodb for
+# production env
+- hosts: db[0]
+ roles:
+ - mongodb
diff --git a/ansible/roles/controller/tasks/deploy.yml
b/ansible/roles/controller/tasks/deploy.yml
index 21f33a6..931d657 100644
--- a/ansible/roles/controller/tasks/deploy.yml
+++ b/ansible/roles/controller/tasks/deploy.yml
@@ -296,6 +296,19 @@
env: "{{ env | combine(elastic_env) }}"
when: db.activation_store.backend == "ElasticSearch"
+- name: setup mongodb artifact store env
+ set_fact:
+ mongodb_env:
+ "CONFIG_whisk_mongodb_uri": "{{ db.mongodb.connect_string }}"
+ "CONFIG_whisk_mongodb_database": "{{ db.mongodb.database }}"
+ "CONFIG_whisk_spi_ArtifactStoreProvider":
"org.apache.openwhisk.core.database.mongodb.MongoDBArtifactStoreProvider"
+ when: db.artifact_store.backend == "MongoDB"
+
+- name: merge mongodb artifact store env
+ set_fact:
+ env: "{{ env | combine(mongodb_env) }}"
+ when: db.artifact_store.backend == "MongoDB"
+
- name: populate volumes for controller
set_fact:
controller_volumes:
diff --git a/ansible/roles/invoker/tasks/deploy.yml
b/ansible/roles/invoker/tasks/deploy.yml
index 591a07e..28d8ead 100644
--- a/ansible/roles/invoker/tasks/deploy.yml
+++ b/ansible/roles/invoker/tasks/deploy.yml
@@ -315,6 +315,19 @@
env: "{{ env | combine(elastic_env) }}"
when: db.activation_store.backend == "ElasticSearch"
+- name: setup mongodb artifact store env
+ set_fact:
+ mongodb_env:
+ "CONFIG_whisk_mongodb_uri": "{{ db.mongodb.connect_string }}"
+ "CONFIG_whisk_mongodb_database": "{{ db.mongodb.database }}"
+ "CONFIG_whisk_spi_ArtifactStoreProvider":
"org.apache.openwhisk.core.database.mongodb.MongoDBArtifactStoreProvider"
+ when: db.artifact_store.backend == "MongoDB"
+
+- name: merge mongodb artifact store env
+ set_fact:
+ env: "{{ env | combine(mongodb_env) }}"
+ when: db.artifact_store.backend == "MongoDB"
+
- name: include plugins
include_tasks: "{{ inv_item }}.yml"
with_items: "{{ invoker_plugins | default([]) }}"
diff --git a/ansible/tasks/db/checkDb.yml
b/ansible/roles/mongodb/tasks/clean.yml
similarity index 63%
copy from ansible/tasks/db/checkDb.yml
copy to ansible/roles/mongodb/tasks/clean.yml
index 5962400..248ee55 100644
--- a/ansible/tasks/db/checkDb.yml
+++ b/ansible/roles/mongodb/tasks/clean.yml
@@ -15,16 +15,10 @@
# limitations under the License.
#
---
-# Checks, that the Database exists
-# dbName - name of the database to check
-# dbUser - name of the user which should have access rights
-# dbPass - password of the user which should have access
+# Remove MongoDB server
-- name: check if {{ dbName }} with {{ db.provider }} exists
- uri:
- url: "{{ db.protocol }}://{{ db.host }}:{{ db.port }}/{{ dbName }}"
- method: HEAD
- status_code: 200
- user: "{{ dbUser }}"
- password: "{{ dbPass }}"
- force_basic_auth: yes
+- name: remove MongoDB
+ docker_container:
+ name: mongodb
+ state: absent
+ keep_volumes: False
diff --git a/ansible/tasks/db/checkDb.yml
b/ansible/roles/mongodb/tasks/deploy.yml
similarity index 53%
copy from ansible/tasks/db/checkDb.yml
copy to ansible/roles/mongodb/tasks/deploy.yml
index 5962400..14bf146 100644
--- a/ansible/tasks/db/checkDb.yml
+++ b/ansible/roles/mongodb/tasks/deploy.yml
@@ -15,16 +15,24 @@
# limitations under the License.
#
---
-# Checks, that the Database exists
-# dbName - name of the database to check
-# dbUser - name of the user which should have access rights
-# dbPass - password of the user which should have access
+# This role will run a MongoDB server on the db group, this is only for test,
please use
+# shared cluster for production env
-- name: check if {{ dbName }} with {{ db.provider }} exists
- uri:
- url: "{{ db.protocol }}://{{ db.host }}:{{ db.port }}/{{ dbName }}"
- method: HEAD
- status_code: 200
- user: "{{ dbUser }}"
- password: "{{ dbPass }}"
- force_basic_auth: yes
+- name: (re)start mongodb
+ vars:
+ mongodb_image: "{{ mongodb.docker_image | default('mongo:' ~
mongodb.version ) }}"
+ docker_container:
+ name: mongodb
+ image: "{{ mongodb_image }}"
+ state: started
+ recreate: true
+ restart_policy: "{{ docker.restart.policy }}"
+ hostname: "mongodb"
+ user: "mongodb"
+ volumes:
+ - "{{ db.mongodb.data_volume }}:/data/db"
+ ports:
+ - "27017:27017"
+
+- name: wait until the MongoDB in this host is up and running
+ local_action: wait_for host={{ ansible_host }} port=27017 state=started
delay=5 timeout=60
diff --git a/ansible/tasks/db/checkDb.yml b/ansible/roles/mongodb/tasks/main.yml
similarity index 63%
copy from ansible/tasks/db/checkDb.yml
copy to ansible/roles/mongodb/tasks/main.yml
index 5962400..2d27a9c 100644
--- a/ansible/tasks/db/checkDb.yml
+++ b/ansible/roles/mongodb/tasks/main.yml
@@ -15,16 +15,13 @@
# limitations under the License.
#
---
-# Checks, that the Database exists
-# dbName - name of the database to check
-# dbUser - name of the user which should have access rights
-# dbPass - password of the user which should have access
+# This role will deploy a database server. Use the role if you want to use
CouchCB locally.
+# In deploy mode it will start the MongoDB container.
+# In clean mode it will remove the MongoDB container.
+
+- import_tasks: deploy.yml
+ when: mode == "deploy"
+
+- import_tasks: clean.yml
+ when: mode == "clean"
-- name: check if {{ dbName }} with {{ db.provider }} exists
- uri:
- url: "{{ db.protocol }}://{{ db.host }}:{{ db.port }}/{{ dbName }}"
- method: HEAD
- status_code: 200
- user: "{{ dbUser }}"
- password: "{{ dbPass }}"
- force_basic_auth: yes
diff --git a/ansible/tasks/db/checkDb.yml b/ansible/tasks/db/checkDb.yml
index 5962400..bb78a66 100644
--- a/ansible/tasks/db/checkDb.yml
+++ b/ansible/tasks/db/checkDb.yml
@@ -28,3 +28,7 @@
user: "{{ dbUser }}"
password: "{{ dbPass }}"
force_basic_auth: yes
+ when: db.artifact_store.backend == "CouchDB"
+
+# the collection in MongoDB doesn't need to be created in advance, so just
skip it
+# - name: check if {{ dbName }} on MongoDB exists
diff --git a/ansible/templates/whisk.conf.j2 b/ansible/templates/whisk.conf.j2
index 9bb9d53..c6ef559 100644
--- a/ansible/templates/whisk.conf.j2
+++ b/ansible/templates/whisk.conf.j2
@@ -14,4 +14,14 @@ whisk {
WhiskActivation = "{{ db.whisk.activations }}"
}
}
+ {% if db.artifact_store.backend == 'MongoDB' %}
+ mongodb {
+ uri = "{{ db.mongodb.connect_string }}"
+ database = "{{ db.mongodb.database }}"
+ }
+
+ spi {
+ ArtifactStoreProvider =
org.apache.openwhisk.core.database.mongodb.MongoDBArtifactStoreProvider
+ }
+ {% endif %}
}
diff --git a/common/scala/build.gradle b/common/scala/build.gradle
index 8211f55..1dddd97 100644
--- a/common/scala/build.gradle
+++ b/common/scala/build.gradle
@@ -90,6 +90,8 @@ dependencies {
compile "com.microsoft.azure:azure-cosmosdb:2.6.2"
compile
"com.sksamuel.elastic4s:elastic4s-http_${gradle.scala.depVersion}:6.7.4"
+ //for mongo
+ compile
"org.mongodb.scala:mongo-scala-driver_${gradle.scala.depVersion}:2.7.0"
compile
("com.lightbend.akka:akka-stream-alpakka-s3_${gradle.scala.depVersion}:1.1.2") {
exclude group: 'org.apache.httpcomponents' //Not used as alpakka uses
akka-http
diff --git a/common/scala/src/main/resources/application.conf
b/common/scala/src/main/resources/application.conf
index 287b5c3..7600638 100644
--- a/common/scala/src/main/resources/application.conf
+++ b/common/scala/src/main/resources/application.conf
@@ -381,6 +381,13 @@ whisk {
#}
}
+ # MongoDB related configuration
+ # For example:
+ # mongodb {
+ # uri = mongodb://localhost:27017 # DB Uri
+ # database = # Database name
+ #}
+
# transaction ID related configuration
transactions {
header = "X-Request-ID"
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
index 1755abe..00d876e 100644
--- a/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
+++ b/common/scala/src/main/scala/org/apache/openwhisk/core/WhiskConfig.scala
@@ -208,6 +208,7 @@ object ConfigKeys {
val couchdb = "whisk.couchdb"
val cosmosdb = "whisk.cosmosdb"
+ val mongodb = "whisk.mongodb"
val kafka = "whisk.kafka"
val kafkaCommon = s"$kafka.common"
val kafkaProducer = s"$kafka.producer"
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStore.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStore.scala
new file mode 100644
index 0000000..bab4a1f
--- /dev/null
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStore.scala
@@ -0,0 +1,661 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.mongodb
+
+import akka.actor.ActorSystem
+import akka.event.Logging.ErrorLevel
+import akka.http.scaladsl.model._
+import akka.stream.ActorMaterializer
+import akka.stream.scaladsl._
+import akka.util.ByteString
+import com.mongodb.client.gridfs.model.GridFSUploadOptions
+import org.apache.openwhisk.common.{Logging, LoggingMarkers, TransactionId}
+import org.apache.openwhisk.core.database._
+import org.apache.openwhisk.core.database.StoreUtils._
+import org.apache.openwhisk.core.entity.Attachments.Attached
+import org.apache.openwhisk.core.entity.{DocId, DocInfo, DocRevision,
DocumentReader, UUID}
+import org.apache.openwhisk.http.Messages
+import org.bson.json.{JsonMode, JsonWriterSettings}
+import org.mongodb.scala.bson.BsonString
+import org.mongodb.scala.bson.collection.immutable.Document
+import org.mongodb.scala.gridfs.{GridFSBucket, GridFSFile,
MongoGridFSException}
+import org.mongodb.scala.model._
+import org.mongodb.scala.{MongoClient, MongoCollection, MongoException}
+import spray.json._
+
+import scala.concurrent.Future
+import scala.util.Try
+
+object MongoDBArtifactStore {
+ val _computed = "_computed"
+}
+
+/**
+ * Basic client to put and delete artifacts in a data store.
+ *
+ * @param client the mongodb client to access database
+ * @param dbName the name of the database to operate on
+ * @param collName the name of the collection to operate on
+ * @param documentHandler helper class help to simulate the designDoc of
CouchDB
+ * @param viewMapper helper class help to simulate the designDoc of CouchDB
+ */
+class MongoDBArtifactStore[DocumentAbstraction <: DocumentSerializer](client:
MongoClient,
+ dbName:
String,
+
collName: String,
+
documentHandler: DocumentHandler,
+
viewMapper: MongoDBViewMapper,
+ val
inliningConfig: InliningConfig,
+ val
attachmentStore: Option[AttachmentStore])(
+ implicit system: ActorSystem,
+ val logging: Logging,
+ jsonFormat: RootJsonFormat[DocumentAbstraction],
+ val materializer: ActorMaterializer,
+ docReader: DocumentReader)
+ extends ArtifactStore[DocumentAbstraction]
+ with DocumentProvider
+ with DefaultJsonProtocol
+ with AttachmentSupport[DocumentAbstraction] {
+
+ import MongoDBArtifactStore._
+
+ protected[core] implicit val executionContext = system.dispatcher
+
+ private val mongodbScheme = "mongodb"
+ val attachmentScheme: String =
attachmentStore.map(_.scheme).getOrElse(mongodbScheme)
+
+ private val database = client.getDatabase(dbName)
+ private val collection = getCollectionAndCreateIndexes()
+ private val gridFSBucket = GridFSBucket(database, collName)
+
+ private val jsonWriteSettings =
JsonWriterSettings.builder().outputMode(JsonMode.RELAXED).build
+
+ // MongoDB doesn't support using `$` as the first char of field name, so
below two fields needs to be encoded first
+ private val fieldsNeedEncode = Seq("annotations", "parameters")
+
+ override protected[database] def put(d: DocumentAbstraction)(implicit
transid: TransactionId): Future[DocInfo] = {
+ val asJson = d.toDocumentRecord
+
+ val id: String = asJson.fields.getOrElse("_id",
JsString.empty).convertTo[String].trim
+ require(!id.isEmpty, "document id must be defined")
+
+ val (old_rev, rev) = revisionCalculate(asJson)
+ val docinfoStr = s"id: $id, rev: $rev"
+ val start =
+ transid.started(this, LoggingMarkers.DATABASE_SAVE, s"[PUT] '$collName'
saving document: '$docinfoStr'")
+
+ val encodedData = encodeFields(fieldsNeedEncode, asJson)
+
+ val data = JsObject(
+ encodedData.fields + (_computed ->
documentHandler.computedFields(asJson)) + ("_rev" -> rev.toJson))
+
+ val filters =
+ if (rev.startsWith("1-")) {
+ // for new document, we should get no matched document and insert new
one
+ // if there is a matched document, that one with no _rev field will be
replaced
+ // if there is a document with the same id but has an _rev field, will
return en E11000(conflict) error
+ Filters.and(Filters.eq("_id", id), Filters.not(Filters.exists("_rev")))
+ } else {
+ // for old document, we should find a matched document and replace it
+ // if no matched document find and try to insert new document, mongodb
will return an E11000 error
+ Filters.and(Filters.eq("_id", id), Filters.eq("_rev", old_rev))
+ }
+
+ val f =
+ collection
+ .findOneAndReplace(
+ filters,
+ Document(data.compactPrint),
+
FindOneAndReplaceOptions().upsert(true).returnDocument(ReturnDocument.AFTER))
+ .toFuture()
+ .map { doc =>
+ transid.finished(this, start, s"[PUT] '$collName' completed
document: '$docinfoStr', document: '$doc'")
+ DocInfo(DocId(id), DocRevision(rev))
+ }
+ .recover {
+ // E11000 means a duplicate key error
+ case t: MongoException if t.getCode == 11000 =>
+ transid.finished(this, start, s"[PUT] '$dbName', document:
'$docinfoStr'; conflict.")
+ throw DocumentConflictException("conflict on 'put'")
+ case t: MongoException =>
+ transid.failed(
+ this,
+ start,
+ s"[PUT] '$dbName' failed to put document: '$docinfoStr'; return
error code: '${t.getCode}'",
+ ErrorLevel)
+ throw new Exception("Unexpected mongodb server error: " +
t.getMessage)
+ }
+
+ reportFailure(
+ f,
+ failure =>
+ transid
+ .failed(this, start, s"[PUT] '$collName' internal error, failure:
'${failure.getMessage}'", ErrorLevel))
+ }
+
+ override protected[database] def del(doc: DocInfo)(implicit transid:
TransactionId): Future[Boolean] = {
+ require(doc != null && doc.rev.asString != null, "doc revision required
for delete")
+
+ val start =
+ transid.started(this, LoggingMarkers.DATABASE_DELETE, s"[DEL]
'$collName' deleting document: '$doc'")
+
+ val f = collection
+ .deleteOne(Filters.and(Filters.eq("_id", doc.id.id), Filters.eq("_rev",
doc.rev.rev)))
+ .toFuture()
+ .flatMap { result =>
+ if (result.getDeletedCount == 1) { // the result can only be 1 or 0
+ transid.finished(this, start, s"[DEL] '$collName' completed
document: '$doc'")
+ Future(true)
+ } else {
+ collection.find(Filters.eq("_id", doc.id.id)).toFuture.map { result
=>
+ if (result.size == 1) {
+ // find the document according to _id, conflict
+ transid.finished(this, start, s"[DEL] '$collName', document:
'$doc'; conflict.")
+ throw DocumentConflictException("conflict on 'delete'")
+ } else {
+ // doesn't find the document according to _id, not found
+ transid.finished(this, start, s"[DEL] '$collName', document:
'$doc'; not found.")
+ throw NoDocumentException(s"$doc not found on 'delete'")
+ }
+ }
+ }
+ }
+ .recover {
+ case t: MongoException =>
+ transid.failed(
+ this,
+ start,
+ s"[DEL] '$collName' failed to delete document: '$doc'; error code:
'${t.getCode}'",
+ ErrorLevel)
+ throw new Exception("Unexpected mongodb server error: " +
t.getMessage)
+ }
+
+ reportFailure(
+ f,
+ failure =>
+ transid.failed(
+ this,
+ start,
+ s"[DEL] '$collName' internal error, doc: '$doc', failure:
'${failure.getMessage}'",
+ ErrorLevel))
+ }
+
+ override protected[database] def get[A <: DocumentAbstraction](doc: DocInfo,
+
attachmentHandler: Option[(A, Attached) => A] = None)(
+ implicit transid: TransactionId,
+ ma: Manifest[A]): Future[A] = {
+
+ val start = transid.started(this, LoggingMarkers.DATABASE_GET, s"[GET]
'$dbName' finding document: '$doc'")
+
+ require(doc != null, "doc undefined")
+
+ val f = collection
+ .find(Filters.eq("_id", doc.id.id)) // method deserialize will check
whether the _rev matched
+ .toFuture()
+ .map(result =>
+ if (result.isEmpty) {
+ transid.finished(this, start, s"[GET] '$collName', document: '$doc';
not found.")
+ throw NoDocumentException("not found on 'get'")
+ } else {
+ transid.finished(this, start, s"[GET] '$collName' completed: found
document '$doc'")
+ val response =
result.head.toJson(jsonWriteSettings).parseJson.asJsObject
+ val decodeData = decodeFields(fieldsNeedEncode, response)
+
+ val deserializedDoc = deserialize[A, DocumentAbstraction](doc,
decodeData)
+ attachmentHandler
+ .map(processAttachments(deserializedDoc, decodeData, doc.id.id, _))
+ .getOrElse(deserializedDoc)
+ })
+ .recoverWith {
+ case t: MongoException =>
+ transid.finished(this, start, s"[GET] '$collName' failed to get
document: '$doc'; error code: '${t.getCode}'")
+ throw new Exception("Unexpected mongodb server error: " +
t.getMessage)
+ case _: DeserializationException => throw
DocumentUnreadable(Messages.corruptedEntity)
+ }
+
+ reportFailure(
+ f,
+ failure =>
+ transid.failed(
+ this,
+ start,
+ s"[GET] '$collName' internal error, doc: '$doc', failure:
'${failure.getMessage}'",
+ ErrorLevel))
+ }
+
+ override protected[database] def get(id: DocId)(implicit transid:
TransactionId): Future[Option[JsObject]] = {
+ val start = transid.started(this, LoggingMarkers.DATABASE_GET, s"[GET]
'$collName' finding document: '$id'")
+ val f = collection
+ .find(Filters.equal("_id", id.id))
+ .head()
+ .map {
+ case d: Document =>
+ transid.finished(this, start, s"[GET] '$dbName' completed: found
document '$id'")
+ Some(decodeFields(fieldsNeedEncode,
d.toJson(jsonWriteSettings).parseJson.asJsObject))
+ case null =>
+ transid.finished(this, start, s"[GET] '$dbName', document: '$id';
not found.")
+ None
+ }
+ .recover {
+ case t: MongoException =>
+ transid.failed(
+ this,
+ start,
+ s"[GET] '$collName' failed to get document: '$id'; error code:
'${t.getCode}'",
+ ErrorLevel)
+ throw new Exception("Unexpected mongodb server error: " +
t.getMessage)
+ }
+
+ reportFailure(
+ f,
+ failure =>
+ transid.failed(
+ this,
+ start,
+ s"[GET] '$collName' internal error, doc: '$id', failure:
'${failure.getMessage}'",
+ ErrorLevel))
+ }
+
+ override protected[core] def query(table: String,
+ startKey: List[Any],
+ endKey: List[Any],
+ skip: Int,
+ limit: Int,
+ includeDocs: Boolean,
+ descending: Boolean,
+ reduce: Boolean,
+ stale: StaleParameter)(implicit transid:
TransactionId): Future[List[JsObject]] = {
+ require(!(reduce && includeDocs), "reduce and includeDocs cannot both be
true")
+ require(!reduce, "Reduce scenario not supported") //TODO Investigate reduce
+ require(skip >= 0, "skip should be non negative")
+ require(limit >= 0, "limit should be non negative")
+
+ val Array(ddoc, viewName) = table.split("/")
+
+ val find = collection
+ .find(viewMapper.filter(ddoc, viewName, startKey, endKey))
+
+ viewMapper.sort(ddoc, viewName, descending).foreach(find.sort)
+
+ find.skip(skip).limit(limit)
+
+ val realIncludeDocs = includeDocs |
documentHandler.shouldAlwaysIncludeDocs(ddoc, viewName)
+ val start = transid.started(this, LoggingMarkers.DATABASE_QUERY, s"[QUERY]
'$collName' searching '$table")
+
+ val f = find
+ .toFuture()
+ .map { docs =>
+ transid.finished(this, start, s"[QUERY] '$dbName' completed: matched
${docs.size}")
+ docs.map { doc =>
+ val js = decodeFields(fieldsNeedEncode,
doc.toJson(jsonWriteSettings).parseJson.convertTo[JsObject])
+ documentHandler.transformViewResult(
+ ddoc,
+ viewName,
+ startKey,
+ endKey,
+ realIncludeDocs,
+ JsObject(js.fields - _computed),
+ MongoDBArtifactStore.this)
+ }
+ }
+ .flatMap(Future.sequence(_))
+ .map(_.flatten.toList)
+ .recover {
+ case t: MongoException =>
+ transid.failed(this, start, s"[QUERY] '$collName' failed; error
code: '${t.getCode}'", ErrorLevel)
+ throw new Exception("Unexpected mongodb server error: " +
t.getMessage)
+ }
+
+ reportFailure(
+ f,
+ failure =>
+ transid
+ .failed(this, start, s"[QUERY] '$collName' internal error, failure:
'${failure.getMessage}'", ErrorLevel))
+ }
+
+ protected[core] def count(table: String, startKey: List[Any], endKey:
List[Any], skip: Int, stale: StaleParameter)(
+ implicit transid: TransactionId): Future[Long] = {
+ require(skip >= 0, "skip should be non negative")
+
+ val Array(ddoc, viewName) = table.split("/")
+ val start = transid.started(this, LoggingMarkers.DATABASE_QUERY, s"[COUNT]
'$dbName' searching '$table")
+
+ val query = viewMapper.filter(ddoc, viewName, startKey, endKey)
+
+ val option = CountOptions().skip(skip)
+ val f =
+ collection
+ .countDocuments(query, option)
+ .toFuture()
+ .map { result =>
+ transid.finished(this, start, s"[COUNT] '$collName' completed: count
$result")
+ result
+ }
+ .recover {
+ case t: MongoException =>
+ transid.failed(this, start, s"[COUNT] '$collName' failed; error
code: '${t.getCode}'", ErrorLevel)
+ throw new Exception("Unexpected mongodb server error: " +
t.getMessage)
+ }
+
+ reportFailure(
+ f,
+ failure =>
+ transid
+ .failed(this, start, s"[COUNT] '$dbName' internal error, failure:
'${failure.getMessage}'", ErrorLevel))
+ }
+
+ override protected[database] def putAndAttach[A <: DocumentAbstraction](
+ doc: A,
+ update: (A, Attached) => A,
+ contentType: ContentType,
+ docStream: Source[ByteString, _],
+ oldAttachment: Option[Attached])(implicit transid: TransactionId):
Future[(DocInfo, Attached)] = {
+
+ attachmentStore match {
+ case Some(as) =>
+ attachToExternalStore(doc, update, contentType, docStream,
oldAttachment, as)
+ case None =>
+ attachToMongo(doc, update, contentType, docStream, oldAttachment)
+ }
+
+ }
+
+ private def attachToMongo[A <: DocumentAbstraction](
+ doc: A,
+ update: (A, Attached) => A,
+ contentType: ContentType,
+ docStream: Source[ByteString, _],
+ oldAttachment: Option[Attached])(implicit transid: TransactionId):
Future[(DocInfo, Attached)] = {
+
+ for {
+ bytesOrSource <- inlineOrAttach(docStream)
+ uri = uriOf(bytesOrSource, UUID().asString)
+ attached <- {
+ bytesOrSource match {
+ case Left(bytes) =>
+ Future.successful(Attached(uri.toString, contentType,
Some(bytes.size), Some(digest(bytes))))
+ case Right(source) =>
+ attach(doc, uri.path.toString, contentType, source).map { r =>
+ Attached(uri.toString, contentType, Some(r.length),
Some(r.digest))
+ }
+ }
+ }
+ docInfo <- put(update(doc, attached))
+
+ //Remove old attachment if it was part of attachmentStore
+ _ <- oldAttachment
+ .map { old =>
+ val oldUri = Uri(old.attachmentName)
+ if (oldUri.scheme == mongodbScheme) {
+ val name = oldUri.path.toString
+
gridFSBucket.delete(BsonString(s"${docInfo.id.id}/$name")).toFuture.map { _ =>
+ true
+ }
+ } else {
+ Future.successful(true)
+ }
+ }
+ .getOrElse(Future.successful(true))
+ } yield (docInfo, attached)
+ }
+
+ private def attach(d: DocumentAbstraction, name: String, contentType:
ContentType, docStream: Source[ByteString, _])(
+ implicit transid: TransactionId): Future[AttachResult] = {
+
+ logging.info(this, s"Uploading attach $name")
+ val asJson = d.toDocumentRecord
+ val id: String = asJson.fields("_id").convertTo[String].trim
+ require(!id.isEmpty, "document id must be defined")
+
+ val start = transid.started(
+ this,
+ LoggingMarkers.DATABASE_ATT_SAVE,
+ s"[ATT_PUT] '$collName' uploading attachment '$name' of document 'id:
$id'")
+
+ val document: org.bson.Document = new org.bson.Document("contentType",
contentType.toString)
+ //add the document id to the metadata
+ document.append("belongsTo", id)
+
+ val option = new GridFSUploadOptions().metadata(document)
+
+ val uploadStream = gridFSBucket.openUploadStream(BsonString(s"$id/$name"),
name, option)
+ val sink = MongoDBAsyncStreamSink(uploadStream)
+
+ val f = docStream
+ .runWith(combinedSink(sink))
+ .map { r =>
+ transid
+ .finished(this, start, s"[ATT_PUT] '$collName' completed uploading
attachment '$name' of document '$id'")
+ AttachResult(r.digest, r.length)
+ }
+ .recover {
+ case t: MongoException =>
+ transid.failed(
+ this,
+ start,
+ s"[ATT_PUT] '$collName' failed to upload attachment '$name' of
document '$id'; error code '${t.getCode}'",
+ ErrorLevel)
+ throw new Exception("Unexpected mongodb server error: " +
t.getMessage)
+ }
+
+ reportFailure(
+ f,
+ failure =>
+ transid.failed(
+ this,
+ start,
+ s"[ATT_PUT] '$collName' internal error, name: '$name', doc: '$id',
failure: '${failure.getMessage}'",
+ ErrorLevel))
+ }
+
+ override protected[core] def readAttachment[T](doc: DocInfo, attached:
Attached, sink: Sink[ByteString, Future[T]])(
+ implicit transid: TransactionId): Future[T] = {
+
+ val name = attached.attachmentName
+ val attachmentUri = Uri(name)
+
+ attachmentUri.scheme match {
+ case AttachmentSupport.MemScheme =>
+ memorySource(attachmentUri).runWith(sink)
+ case s if s == mongodbScheme || attachmentUri.isRelative =>
+ //relative case is for compatibility with earlier naming approach
where attachment name would be like 'jarfile'
+ //Compared to current approach of '<scheme>:<name>'
+ readAttachmentFromMongo(doc, attachmentUri, sink)
+ case s if attachmentStore.isDefined && attachmentStore.get.scheme == s =>
+ attachmentStore.get.readAttachment(doc.id,
attachmentUri.path.toString, sink)
+ case _ =>
+ throw new IllegalArgumentException(s"Unknown attachment scheme in
attachment uri $attachmentUri")
+ }
+ }
+
+ private def readAttachmentFromMongo[T](doc: DocInfo, attachmentUri: Uri,
sink: Sink[ByteString, Future[T]])(
+ implicit transid: TransactionId): Future[T] = {
+
+ val attachmentName = attachmentUri.path.toString
+ val start = transid.started(
+ this,
+ LoggingMarkers.DATABASE_ATT_GET,
+ s"[ATT_GET] '$dbName' finding attachment '$attachmentName' of document
'$doc'")
+
+ require(doc != null, "doc undefined")
+ require(doc.rev.rev != null, "doc revision must be specified")
+
+ val downloadStream =
gridFSBucket.openDownloadStream(BsonString(s"${doc.id.id}/$attachmentName"))
+
+ def readStream(file: GridFSFile) = {
+ val source = MongoDBAsyncStreamSource(downloadStream)
+ source
+ .runWith(sink)
+ .map { result =>
+ transid
+ .finished(
+ this,
+ start,
+ s"[ATT_GET] '$collName' completed: found attachment
'$attachmentName' of document '$doc'")
+ result
+ }
+ }
+
+ def getGridFSFile = {
+ downloadStream
+ .gridFSFile()
+ .head()
+ .transform(
+ identity, {
+ case ex: MongoGridFSException if ex.getMessage.contains("File not
found") =>
+ transid.finished(
+ this,
+ start,
+ s"[ATT_GET] '$collName', retrieving attachment
'$attachmentName' of document '$doc'; not found.")
+ NoDocumentException("Not found on 'readAttachment'.")
+ case ex: MongoGridFSException =>
+ transid.failed(
+ this,
+ start,
+ s"[ATT_GET] '$collName' failed to get attachment
'$attachmentName' of document '$doc'; error code: '${ex.getCode}'",
+ ErrorLevel)
+ throw new Exception("Unexpected mongodb server error: " +
ex.getMessage)
+ case t => t
+ })
+ }
+
+ val f = for {
+ file <- getGridFSFile
+ result <- readStream(file)
+ } yield result
+
+ reportFailure(
+ f,
+ failure =>
+ transid.failed(
+ this,
+ start,
+ s"[ATT_GET] '$dbName' internal error, name: '$attachmentName', doc:
'$doc', failure: '${failure.getMessage}'",
+ ErrorLevel))
+
+ }
+
+ override protected[core] def deleteAttachments[T](doc: DocInfo)(implicit
transid: TransactionId): Future[Boolean] =
+ attachmentStore
+ .map(as => as.deleteAttachments(doc.id))
+ .getOrElse(Future.successful(true)) // For MongoDB it is expected that
the entire document is deleted.
+
+ override def shutdown(): Unit = {
+ // MongoClient maintains the connection pool internally, we don't need to
manage it
+ attachmentStore.foreach(_.shutdown())
+ }
+
+ private def reportFailure[T, U](f: Future[T], onFailure: Throwable => U):
Future[T] = {
+ f.failed.foreach {
+ case _: ArtifactStoreException => // These failures are intentional and
shouldn't trigger the catcher.
+ case x => onFailure(x)
+ }
+ f
+ }
+
+ // calculate the revision manually, to be compatible with couchdb's _rev
field
+ private def revisionCalculate(doc: JsObject): (String, String) = {
+ val md = StoreUtils.emptyDigest()
+ val new_rev =
+ md.digest(doc.toString.getBytes()).map(0xFF & _).map { "%02x".format(_)
}.foldLeft("") { _ + _ }.take(32)
+ doc.fields
+ .get("_rev")
+ .map { value =>
+ val start = value.convertTo[String].trim.split("-").apply(0).toInt + 1
+ (value.convertTo[String].trim, s"$start-$new_rev")
+ }
+ .getOrElse {
+ ("", s"1-$new_rev")
+ }
+ }
+
+ private def processAttachments[A <: DocumentAbstraction](doc: A,
+ js: JsObject,
+ docId: String,
+ attachmentHandler:
(A, Attached) => A): A = {
+ js.fields("exec").asJsObject().fields.get("code").map {
+ case code: JsObject =>
+ code.getFields("attachmentName", "attachmentType", "digest", "length")
match {
+ case Seq(JsString(name), JsString(contentTypeValue),
JsString(digest), JsNumber(length)) =>
+ val contentType = ContentType.parse(contentTypeValue) match {
+ case Right(ct) => ct
+ case Left(_) => ContentTypes.NoContentType //Should not happen
+ }
+ attachmentHandler(doc, Attached(getAttachmentName(name),
contentType, Some(length.longValue), Some(digest)))
+ case x =>
+ throw DeserializationException("Attachment json does not have
required fields" + x)
+ }
+ case _ => doc
+ } getOrElse {
+ doc // This should not happen as an action always contain field:
exec.code
+ }
+ }
+
+ /**
+ * Determines if the attachment scheme confirms to new UUID based scheme or
not
+ * and generates the name based on that
+ */
+ private def getAttachmentName(name: String): String = {
+ Try(java.util.UUID.fromString(name))
+ .map(_ => Uri.from(scheme = attachmentScheme, path = name).toString)
+ .getOrElse(name)
+ }
+
+ private def getCollectionAndCreateIndexes(): MongoCollection[Document] = {
+ val coll = database.getCollection(collName)
+ // create indexes in specific collection if they do not exist
+ coll.listIndexes().toFuture().map { idxes =>
+ val keys = idxes.map {
+ _.get("key").map { fields =>
+ Document(fields.asDocument())
+ } getOrElse {
+ Document.empty // this should not happen
+ }
+ }
+
+ viewMapper.indexes.foreach { idx =>
+ if (!keys.contains(idx))
+ coll.createIndex(idx).toFuture
+ }
+ }
+ coll
+ }
+
+ // encode JsValue which has complex and arbitrary structure to JsString
+ private def encodeFields(fields: Seq[String], jsValue: JsObject): JsObject =
{
+ var data = jsValue.fields
+ fields.foreach { field =>
+ data.get(field).foreach { value =>
+ data = data.updated(field, JsString(value.compactPrint))
+ }
+ }
+ JsObject(data)
+ }
+
+ // decode fields from JsString
+ private def decodeFields(fields: Seq[String], jsValue: JsObject): JsObject =
{
+ var data = jsValue.fields
+ fields.foreach { field =>
+ data.get(field).foreach { value =>
+ Try {
+ data = data.updated(field,
value.asInstanceOf[JsString].value.parseJson)
+ }
+ }
+ }
+ JsObject(data)
+ }
+}
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStoreProvider.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStoreProvider.scala
new file mode 100644
index 0000000..555b274
--- /dev/null
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStoreProvider.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.mongodb
+
+import akka.actor.ActorSystem
+import akka.stream.ActorMaterializer
+import org.apache.openwhisk.common.Logging
+import org.apache.openwhisk.core.ConfigKeys
+import org.apache.openwhisk.core.database._
+import org.apache.openwhisk.core.entity.size._
+import org.apache.openwhisk.core.entity.{DocumentReader, WhiskActivation,
WhiskAuth, WhiskEntity}
+import org.mongodb.scala.MongoClient
+import pureconfig._
+import pureconfig.generic.auto._
+import spray.json.RootJsonFormat
+
+import scala.reflect.ClassTag
+
+case class MongoDBConfig(uri: String, database: String) {
+ assume(Set(database, uri).forall(_.trim.nonEmpty), "At least one expected
property is missing")
+
+ def collectionFor[D](implicit tag: ClassTag[D]) =
tag.runtimeClass.getSimpleName.toLowerCase
+}
+
+object MongoDBClient {
+ private var _client: Option[MongoClient] = None
+
+ def client(config: MongoDBConfig): MongoClient = {
+ _client.getOrElse {
+ val client = MongoClient(config.uri)
+ _client = Some(client)
+ client
+ }
+ }
+}
+
+object MongoDBArtifactStoreProvider extends ArtifactStoreProvider {
+
+ def makeStore[D <: DocumentSerializer: ClassTag](useBatching: Boolean)(
+ implicit jsonFormat: RootJsonFormat[D],
+ docReader: DocumentReader,
+ actorSystem: ActorSystem,
+ logging: Logging,
+ materializer: ActorMaterializer): ArtifactStore[D] = {
+ val dbConfig = loadConfigOrThrow[MongoDBConfig](ConfigKeys.mongodb)
+ makeArtifactStore(dbConfig, getAttachmentStore())
+ }
+
+ def makeArtifactStore[D <: DocumentSerializer: ClassTag](dbConfig:
MongoDBConfig,
+ attachmentStore:
Option[AttachmentStore])(
+ implicit jsonFormat: RootJsonFormat[D],
+ docReader: DocumentReader,
+ actorSystem: ActorSystem,
+ logging: Logging,
+ materializer: ActorMaterializer): ArtifactStore[D] = {
+
+ val inliningConfig = loadConfigOrThrow[InliningConfig](ConfigKeys.db)
+
+ val (handler, mapper) = handlerAndMapper(implicitly[ClassTag[D]])
+
+ new MongoDBArtifactStore[D](
+ MongoDBClient.client(dbConfig),
+ dbConfig.database,
+ dbConfig.collectionFor[D],
+ handler,
+ mapper,
+ inliningConfig,
+ attachmentStore)
+ }
+
+ private def handlerAndMapper[D](entityType: ClassTag[D])(
+ implicit actorSystem: ActorSystem,
+ logging: Logging,
+ materializer: ActorMaterializer): (DocumentHandler, MongoDBViewMapper) = {
+ entityType.runtimeClass match {
+ case x if x == classOf[WhiskEntity] =>
+ (WhisksHandler, WhisksViewMapper)
+ case x if x == classOf[WhiskActivation] =>
+ (ActivationHandler, ActivationViewMapper)
+ case x if x == classOf[WhiskAuth] =>
+ (SubjectHandler, SubjectViewMapper)
+ }
+ }
+}
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBAsyncStreamSink.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBAsyncStreamSink.scala
new file mode 100644
index 0000000..6a71558
--- /dev/null
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBAsyncStreamSink.scala
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.mongodb
+
+import java.nio.ByteBuffer
+
+import akka.Done
+import akka.stream.{Attributes, IOResult, Inlet, SinkShape}
+import akka.stream.scaladsl.Sink
+import akka.stream.stage.{AsyncCallback, GraphStageLogic,
GraphStageWithMaterializedValue, InHandler}
+import akka.util.ByteString
+import org.mongodb.scala.Completed
+import org.mongodb.scala.gridfs.{AsyncOutputStream}
+
+import scala.concurrent.{ExecutionContext, Future, Promise}
+import scala.util.{Failure, Success, Try}
+
+class MongoDBAsyncStreamSink(stream: AsyncOutputStream)(implicit ec:
ExecutionContext)
+ extends GraphStageWithMaterializedValue[SinkShape[ByteString],
Future[IOResult]] {
+ val in: Inlet[ByteString] = Inlet("AsyncStream.in")
+
+ override val shape: SinkShape[ByteString] = SinkShape(in)
+
+ override def createLogicAndMaterializedValue(inheritedAttributes:
Attributes): (GraphStageLogic, Future[IOResult]) = {
+ val ioResultPromise = Promise[IOResult]()
+ val logic = new GraphStageLogic(shape) with InHandler {
+ handler =>
+ var buffers: Iterator[ByteBuffer] = Iterator()
+ var writeCallback: AsyncCallback[Try[Int]] = _
+ var closeCallback: AsyncCallback[Try[Completed]] = _
+ var position: Int = _
+ var writeDone = Promise[Completed]
+
+ setHandler(in, this)
+
+ override def preStart(): Unit = {
+ //close operation is async and thus requires the stage to remain open
+ //even after all data is read
+ setKeepGoing(true)
+ writeCallback = getAsyncCallback[Try[Int]](handleWriteResult)
+ closeCallback = getAsyncCallback[Try[Completed]](handleClose)
+ pull(in)
+ }
+
+ override def onPush(): Unit = {
+ buffers = grab(in).asByteBuffers.iterator
+ writeDone = Promise[Completed]
+ writeNextBufferOrPull()
+ }
+
+ override def onUpstreamFinish(): Unit = {
+ //Work done perform close
+ //Using async "blessed" callback does not work at this stage so
+ // need to invoke as normal callback
+ //TODO Revisit this
+
+ //write of ByteBuffers from ByteString is an async operation. For last
push
+ //the write operation may involve multiple async callbacks and by that
time
+ //onUpstreamFinish may get invoked. So to ensure that close operation
is performed
+ //"after" the last push writes are done we rely on writeDone promise
+ //and schedule the close on its completion
+ writeDone.future.onComplete(_ =>
stream.close().head().onComplete(handleClose))
+ }
+
+ override def onUpstreamFailure(ex: Throwable): Unit = {
+ fail(ex)
+ }
+
+ private def handleWriteResult(bytesWrittenOrFailure: Try[Int]): Unit =
bytesWrittenOrFailure match {
+ case Success(bytesWritten) =>
+ position += bytesWritten
+ writeNextBufferOrPull()
+ case Failure(failure) => fail(failure)
+ }
+
+ private def handleClose(completed: Try[Completed]): Unit = completed
match {
+ case Success(Completed()) =>
+ completeStage()
+ ioResultPromise.trySuccess(IOResult(position, Success(Done)))
+ case Failure(failure) =>
+ fail(failure)
+ }
+
+ private def writeNextBufferOrPull(): Unit = {
+ if (buffers.hasNext) {
+ stream.write(buffers.next()).head().onComplete(writeCallback.invoke)
+ } else {
+ writeDone.trySuccess(Completed())
+ pull(in)
+ }
+ }
+
+ private def fail(failure: Throwable) = {
+ failStage(failure)
+ ioResultPromise.trySuccess(IOResult(position, Failure(failure)))
+ }
+
+ }
+ (logic, ioResultPromise.future)
+ }
+}
+
+object MongoDBAsyncStreamSink {
+ def apply(stream: AsyncOutputStream)(implicit ec: ExecutionContext):
Sink[ByteString, Future[IOResult]] = {
+ Sink.fromGraph(new MongoDBAsyncStreamSink(stream))
+ }
+}
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBAsyncStreamSource.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBAsyncStreamSource.scala
new file mode 100644
index 0000000..1a7fad9
--- /dev/null
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBAsyncStreamSource.scala
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.mongodb
+
+import java.nio.ByteBuffer
+
+import akka.Done
+import akka.stream.SourceShape
+import akka.stream.Attributes
+import akka.stream.Outlet
+import akka.stream.IOResult
+import akka.stream.scaladsl.Source
+import akka.stream.stage.GraphStageLogic
+import akka.stream.stage.OutHandler
+import akka.stream.stage.GraphStageWithMaterializedValue
+import akka.stream.stage.AsyncCallback
+import akka.util.ByteString
+import org.mongodb.scala.Completed
+import org.mongodb.scala.gridfs.AsyncInputStream
+
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.Promise
+import scala.util.Success
+import scala.util.Try
+import scala.util.Failure
+
+class MongoDBAsyncStreamSource(stream: AsyncInputStream, chunkSize:
Int)(implicit ec: ExecutionContext)
+ extends GraphStageWithMaterializedValue[SourceShape[ByteString],
Future[IOResult]] {
+ require(chunkSize > 0, "chunkSize must be greater than 0")
+ val out: Outlet[ByteString] = Outlet("AsyncStream.out")
+
+ override val shape: SourceShape[ByteString] = SourceShape(out)
+
+ override def createLogicAndMaterializedValue(inheritedAttributes:
Attributes): (GraphStageLogic, Future[IOResult]) = {
+ val ioResultPromise = Promise[IOResult]()
+ val logic = new GraphStageLogic(shape) with OutHandler {
+ handler =>
+ val buffer = ByteBuffer.allocate(chunkSize)
+ var readCallback: AsyncCallback[Try[Int]] = _
+ var closeCallback: AsyncCallback[Try[Completed]] = _
+ var position: Int = _
+
+ setHandler(out, this)
+
+ override def preStart(): Unit = {
+ readCallback = getAsyncCallback[Try[Int]](handleBufferRead)
+ closeCallback = getAsyncCallback[Try[Completed]](handleClose)
+ }
+
+ override def onPull(): Unit = {
+ stream.read(buffer).head().onComplete(readCallback.invoke)
+ }
+
+ private def handleBufferRead(bytesReadOrFailure: Try[Int]): Unit =
bytesReadOrFailure match {
+ case Success(bytesRead) if bytesRead >= 0 =>
+ buffer.flip
+ push(out, ByteString.fromByteBuffer(buffer))
+ buffer.clear
+ position += bytesRead
+ case Success(_) =>
+ stream.close().head().onComplete(closeCallback.invoke) //Work done
perform close
+ case Failure(failure) =>
+ fail(failure)
+ }
+
+ private def handleClose(completed: Try[Completed]): Unit = completed
match {
+ case Success(Completed()) =>
+ completeStage()
+ ioResultPromise.trySuccess(IOResult(position, Success(Done)))
+ case Failure(failure) =>
+ fail(failure)
+ }
+
+ private def fail(failure: Throwable) = {
+ failStage(failure)
+ ioResultPromise.trySuccess(IOResult(position, Failure(failure)))
+ }
+ }
+ (logic, ioResultPromise.future)
+ }
+}
+
+object MongoDBAsyncStreamSource {
+ def apply(stream: AsyncInputStream, chunkSize: Int = 512 * 1024)(
+ implicit ec: ExecutionContext): Source[ByteString, Future[IOResult]] = {
+ Source.fromGraph(new MongoDBAsyncStreamSource(stream, chunkSize))
+ }
+}
diff --git
a/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBViewMapper.scala
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBViewMapper.scala
new file mode 100644
index 0000000..fe792be
--- /dev/null
+++
b/common/scala/src/main/scala/org/apache/openwhisk/core/database/mongodb/MongoDBViewMapper.scala
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.mongodb
+
+import org.apache.openwhisk.core.database._
+import org.apache.openwhisk.core.entity.WhiskQueries
+import org.mongodb.scala.Document
+import org.mongodb.scala.bson.conversions.Bson
+import org.mongodb.scala.model.Filters._
+import org.mongodb.scala.model.Sorts
+
+trait MongoDBViewMapper {
+ protected val _computed: String = "_computed"
+ protected val TOP: String = WhiskQueries.TOP
+
+ val indexes: List[Document]
+
+ def filter(ddoc: String, view: String, startKey: List[Any], endKey:
List[Any]): Bson
+
+ def sort(ddoc: String, view: String, descending: Boolean): Option[Bson]
+
+ protected def checkKeys(startKey: List[Any], endKey: List[Any]): Unit = {
+ require(startKey.nonEmpty)
+ require(endKey.nonEmpty)
+ require(startKey.head == endKey.head, s"First key should be same =>
($startKey) - ($endKey)")
+ }
+}
+
+private object ActivationViewMapper extends MongoDBViewMapper {
+ private val NS = "namespace"
+ private val NS_WITH_PATH = s"${_computed}.${ActivationHandler.NS_PATH}"
+ private val START = "start"
+ override val indexes: List[Document] =
+ List(
+ Document(s"$START" -> -1),
+ Document(s"$START" -> -1, s"$NS" -> -1),
+ Document(s"$NS_WITH_PATH" -> -1, s"$START" -> -1))
+
+ override def filter(ddoc: String, view: String, startKey: List[Any], endKey:
List[Any]): Bson = {
+ checkKeys(startKey, endKey)
+ view match {
+ //whisks-filters ddoc uses namespace + invoking action path as first key
+ case "activations" if ddoc.startsWith("whisks-filters") =>
createActivationFilter(NS_WITH_PATH, startKey, endKey)
+ //whisks ddoc uses namespace as first key
+ case "activations" if ddoc.startsWith("whisks") =>
createActivationFilter(NS, startKey, endKey)
+ case _ => throw
UnsupportedView(s"$ddoc/$view")
+ }
+ }
+
+ override def sort(ddoc: String, view: String, descending: Boolean):
Option[Bson] = {
+ view match {
+ case "activations" if ddoc.startsWith("whisks-filters") =>
+ val sort = if (descending) Sorts.descending(NS_WITH_PATH, START) else
Sorts.ascending(NS_WITH_PATH, START)
+ Some(sort)
+ case "activations" if ddoc.startsWith("whisks") =>
+ val sort = if (descending) Sorts.descending(NS, START) else
Sorts.ascending(NS, START)
+ Some(sort)
+ case _ => throw UnsupportedView(s"$ddoc/$view")
+ }
+ }
+
+ private def createActivationFilter(nsPropName: String, startKey: List[Any],
endKey: List[Any]) = {
+ require(startKey.head.isInstanceOf[String])
+ val matchNS = equal(nsPropName, startKey.head)
+
+ val filter = (startKey, endKey) match {
+ case (_ :: Nil, _ :: `TOP` :: Nil) =>
+ matchNS
+ case (_ :: since :: Nil, _ :: `TOP` :: `TOP` :: Nil) =>
+ and(matchNS, gte(START, since))
+ case (_ :: since :: Nil, _ :: upto :: `TOP` :: Nil) =>
+ and(matchNS, gte(START, since), lte(START, upto))
+ case _ => throw UnsupportedQueryKeys(s"$startKey, $endKey")
+ }
+ filter
+ }
+}
+
+private object WhisksViewMapper extends MongoDBViewMapper {
+ private val NS = "namespace"
+ private val ROOT_NS = s"${_computed}.${WhisksHandler.ROOT_NS}"
+ private val TYPE = "entityType"
+ private val UPDATED = "updated"
+ private val PUBLISH = "publish"
+ private val BINDING = "binding"
+ override val indexes: List[Document] =
+ List(Document(s"$NS" -> -1, s"$UPDATED" -> -1), Document(s"$ROOT_NS" ->
-1, s"$UPDATED" -> -1))
+
+ override def filter(ddoc: String, view: String, startKey: List[Any], endKey:
List[Any]): Bson = {
+ checkKeys(startKey, endKey)
+ view match {
+ case "all" => listAllInNamespace(ddoc, view, startKey, endKey)
+ case _ => listCollectionInNamespace(ddoc, view, startKey, endKey)
+ }
+ }
+
+ private def listCollectionInNamespace(ddoc: String, view: String, startKey:
List[Any], endKey: List[Any]): Bson = {
+
+ val entityType = getEntityType(ddoc, view)
+
+ val matchType = equal(TYPE, entityType)
+ val matchNS = equal(NS, startKey.head)
+ val matchRootNS = equal(ROOT_NS, startKey.head)
+
+ val filter = (startKey, endKey) match {
+ case (ns :: Nil, _ :: `TOP` :: Nil) =>
+ or(and(matchType, matchNS), and(matchType, matchRootNS))
+ case (ns :: since :: Nil, _ :: `TOP` :: `TOP` :: Nil) =>
+ // @formatter:off
+ or(
+ and(matchType, matchNS, gte(UPDATED, since)),
+ and(matchType, matchRootNS, gte(UPDATED, since))
+ )
+ // @formatter:on
+ case (ns :: since :: Nil, _ :: upto :: `TOP` :: Nil) =>
+ or(
+ and(matchType, matchNS, gte(UPDATED, since), lte(UPDATED, upto)),
+ and(matchType, matchRootNS, gte(UPDATED, since), lte(UPDATED, upto)))
+ case _ => throw UnsupportedQueryKeys(s"$ddoc/$view -> ($startKey,
$endKey)")
+ }
+ if (view == "packages-public")
+ and(equal(BINDING, Map.empty), equal(PUBLISH, true), filter)
+ else
+ filter
+ }
+
+ private def listAllInNamespace(ddoc: String, view: String, startKey:
List[Any], endKey: List[Any]): Bson = {
+ val matchRootNS = equal(ROOT_NS, startKey.head)
+ val filter = (startKey, endKey) match {
+ case (ns :: Nil, _ :: `TOP` :: Nil) =>
+ and(exists(TYPE), matchRootNS)
+ case _ => throw UnsupportedQueryKeys(s"$ddoc/$view -> ($startKey,
$endKey)")
+ }
+ filter
+ }
+
+ override def sort(ddoc: String, view: String, descending: Boolean):
Option[Bson] = {
+ view match {
+ case "actions" | "rules" | "triggers" | "packages" | "packages-public" |
"all"
+ if ddoc.startsWith("whisks") || ddoc.startsWith("all-whisks") =>
+ val sort = if (descending) Sorts.descending(UPDATED) else
Sorts.ascending(UPDATED)
+ Some(sort)
+ case _ => throw UnsupportedView(s"$ddoc/$view")
+ }
+ }
+
+ private def getEntityType(ddoc: String, view: String): String = view match {
+ case "actions" => "action"
+ case "rules" => "rule"
+ case "triggers" => "trigger"
+ case "packages" | "packages-public" => "package"
+ case _ => throw
UnsupportedView(s"$ddoc/$view")
+ }
+}
+private object SubjectViewMapper extends MongoDBViewMapper {
+ private val BLOCKED = "blocked"
+ private val SUBJECT = "subject"
+ private val UUID = "uuid"
+ private val KEY = "key"
+ private val NS_NAME = "namespaces.name"
+ private val NS_UUID = "namespaces.uuid"
+ private val NS_KEY = "namespaces.key"
+ private val CONCURRENT_INVOCATIONS = "concurrentInvocations"
+ private val INVOCATIONS_PERMINUTE = "invocationsPerMinute"
+ override val indexes: List[Document] =
+ List(Document(s"$NS_NAME" -> -1), Document(s"$NS_UUID" -> -1, s"$NS_KEY"
-> -1))
+
+ override def filter(ddoc: String, view: String, startKey: List[Any], endKey:
List[Any]): Bson = {
+ require(startKey == endKey, s"startKey: $startKey and endKey: $endKey must
be same for $ddoc/$view")
+ (ddoc, view) match {
+ case (s, "identities") if s.startsWith("subjects") =>
+ filterForMatchingSubjectOrNamespace(ddoc, view, startKey, endKey)
+ case ("namespaceThrottlings", "blockedNamespaces") =>
+ or(equal(BLOCKED, true), equal(CONCURRENT_INVOCATIONS, 0),
equal(INVOCATIONS_PERMINUTE, 0))
+ case _ =>
+ throw UnsupportedView(s"$ddoc/$view")
+ }
+ }
+
+ override def sort(ddoc: String, view: String, descending: Boolean):
Option[Bson] = {
+ (ddoc, view) match {
+ case (s, "identities") if s.startsWith("subjects") => None
+ case ("namespaceThrottlings", "blockedNamespaces") => None
+ case _ =>
+ throw UnsupportedView(s"$ddoc/$view")
+ }
+ }
+
+ private def filterForMatchingSubjectOrNamespace(ddoc: String,
+ view: String,
+ startKey: List[Any],
+ endKey: List[Any]): Bson = {
+ val notBlocked = notEqual(BLOCKED, true)
+ startKey match {
+ case (ns: String) :: Nil => and(notBlocked,
or(equal(SUBJECT, ns), equal(NS_NAME, ns)))
+ case (uuid: String) :: (key: String) :: Nil =>
+ // @formatter:off
+ and(
+ notBlocked,
+ or(
+ and(equal(UUID, uuid), equal(KEY, key)),
+ and(equal(NS_UUID, uuid), equal(NS_KEY, key))
+ ))
+ // @formatter:on
+ case _ => throw UnsupportedQueryKeys(s"$ddoc/$view -> ($startKey,
$endKey)")
+ }
+ }
+
+}
diff --git a/tests/src/test/resources/application.conf.j2
b/tests/src/test/resources/application.conf.j2
index 944f2af..067a736 100644
--- a/tests/src/test/resources/application.conf.j2
+++ b/tests/src/test/resources/application.conf.j2
@@ -67,6 +67,11 @@ whisk {
throughput = 400
}
+ mongodb {
+ uri = ${?MONGODB_CONNECT_STRING}
+ database = ${?MONGODB_DATABASE}
+ }
+
controller {
protocol = {{ controller.protocol }}
https {
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStoreTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStoreTests.scala
new file mode 100644
index 0000000..24c62c1
--- /dev/null
+++
b/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBArtifactStoreTests.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.mongodb
+
+import org.apache.openwhisk.core.database.test.behavior.ArtifactStoreBehavior
+import org.junit.runner.RunWith
+import org.scalatest.FlatSpec
+import org.scalatest.junit.JUnitRunner
+
+@RunWith(classOf[JUnitRunner])
+class MongoDBArtifactStoreTests extends FlatSpec with MongoDBStoreBehaviorBase
with ArtifactStoreBehavior {}
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBAsyncStreamGraphTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBAsyncStreamGraphTests.scala
new file mode 100644
index 0000000..90770f3
--- /dev/null
+++
b/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBAsyncStreamGraphTests.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.mongodb
+
+import java.io.{ByteArrayInputStream, ByteArrayOutputStream, IOException,
InputStream}
+
+import akka.stream.ActorMaterializer
+import akka.stream.scaladsl.{Keep, Sink, Source, StreamConverters}
+import akka.stream.testkit.TestSubscriber
+import akka.util.ByteString
+import common.WskActorSystem
+import org.apache.commons.io.IOUtils
+import org.junit.runner.RunWith
+import org.mockito.ArgumentMatchers._
+import org.mockito.Mockito._
+import org.mongodb.scala.gridfs.helpers.AsyncStreamHelper
+import org.scalatest.{FlatSpec, Matchers}
+import org.scalatest.concurrent.{IntegrationPatience, ScalaFutures}
+import org.scalatest.junit.JUnitRunner
+import org.scalatestplus.mockito.MockitoSugar
+
+import scala.util.Random
+
+@RunWith(classOf[JUnitRunner])
+class MongoDBAsyncStreamGraphTests
+ extends FlatSpec
+ with Matchers
+ with ScalaFutures
+ with WskActorSystem
+ with MockitoSugar
+ with IntegrationPatience {
+
+ implicit val mat = ActorMaterializer()
+
+ behavior of "MongoDBAsyncStreamSource"
+
+ it should "read all bytes" in {
+ val bytes = randomBytes(4000)
+ val asyncStream = AsyncStreamHelper.toAsyncInputStream(bytes)
+
+ val readStream = MongoDBAsyncStreamSource(asyncStream,
42).runWith(StreamConverters.asInputStream())
+ val readBytes = IOUtils.toByteArray(readStream)
+
+ bytes shouldBe readBytes
+ }
+
+ it should "close the stream when done" in {
+ val bytes = randomBytes(4000)
+ val inputStream = new ByteArrayInputStream(bytes)
+ val spiedStream = spy(inputStream)
+ val asyncStream = AsyncStreamHelper.toAsyncInputStream(spiedStream)
+
+ val readStream = MongoDBAsyncStreamSource(asyncStream,
42).runWith(StreamConverters.asInputStream())
+ val readBytes = IOUtils.toByteArray(readStream)
+
+ bytes shouldBe readBytes
+ verify(spiedStream).close()
+ }
+
+ it should "onError with failure and return a failed IOResult when reading
from failed stream" in {
+ val inputStream = mock[InputStream]
+
+ val exception = new IOException("Boom")
+ doThrow(exception).when(inputStream).read(any())
+ val asyncStream = AsyncStreamHelper.toAsyncInputStream(inputStream)
+
+ val (ioResult, p) =
MongoDBAsyncStreamSource(asyncStream).toMat(Sink.asPublisher(false))(Keep.both).run()
+ val c = TestSubscriber.manualProbe[ByteString]()
+ p.subscribe(c)
+
+ val sub = c.expectSubscription()
+ sub.request(1)
+
+ val error = c.expectError()
+ error.getCause should be theSameInstanceAs exception
+
+ ioResult.futureValue.status.isFailure shouldBe true
+ }
+
+ behavior of "MongoDBAsyncStreamSink"
+
+ it should "write all bytes" in {
+ val bytes = randomBytes(4000)
+ val source = StreamConverters.fromInputStream(() => new
ByteArrayInputStream(bytes), 42)
+
+ val os = new ByteArrayOutputStream()
+ val asyncStream = AsyncStreamHelper.toAsyncOutputStream(os)
+
+ val sink = MongoDBAsyncStreamSink(asyncStream)
+ val ioResult = source.toMat(sink)(Keep.right).run()
+
+ ioResult.futureValue.count shouldBe bytes.length
+
+ val writtenBytes = os.toByteArray
+ writtenBytes shouldBe bytes
+ }
+
+ it should "close the stream when done" in {
+ val bytes = randomBytes(4000)
+ val source = StreamConverters.fromInputStream(() => new
ByteArrayInputStream(bytes), 42)
+
+ val outputStream = new CloseRecordingStream()
+ val asyncStream = AsyncStreamHelper.toAsyncOutputStream(outputStream)
+
+ val sink = MongoDBAsyncStreamSink(asyncStream)
+ val ioResult = source.toMat(sink)(Keep.right).run()
+
+ ioResult.futureValue.count shouldBe 4000
+ outputStream.toByteArray shouldBe bytes
+ outputStream.closed shouldBe true
+ }
+
+ it should "onError with failure and return a failed IOResult when writing to
failed stream" in {
+ val os = new ByteArrayOutputStream()
+ val asyncStream = AsyncStreamHelper.toAsyncOutputStream(os)
+
+ val sink = MongoDBAsyncStreamSink(asyncStream)
+ val ioResult = Source(1 to 10)
+ .map { n ⇒
+ if (n == 7) throw new Error("bees!")
+ n
+ }
+ .map(ByteString(_))
+ .runWith(sink)
+ ioResult.futureValue.status.isFailure shouldBe true
+ }
+
+ private def randomBytes(size: Int): Array[Byte] = {
+ val arr = new Array[Byte](size)
+ Random.nextBytes(arr)
+ arr
+ }
+
+ private class CloseRecordingStream extends ByteArrayOutputStream {
+ var closed: Boolean = _
+ override def close() = { super.close(); closed = true }
+ }
+}
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBAttachmentStoreTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBAttachmentStoreTests.scala
new file mode 100644
index 0000000..31eb593
--- /dev/null
+++
b/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBAttachmentStoreTests.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.mongodb
+
+import org.apache.openwhisk.core.database.DocumentSerializer
+import org.apache.openwhisk.core.database.memory.MemoryAttachmentStoreProvider
+import
org.apache.openwhisk.core.database.test.behavior.ArtifactStoreAttachmentBehaviors
+import org.junit.runner.RunWith
+import org.scalatest.FlatSpec
+import org.scalatest.junit.JUnitRunner
+
+import scala.reflect.ClassTag
+
+@RunWith(classOf[JUnitRunner])
+class MongoDBAttachmentStoreTests extends FlatSpec with
MongoDBStoreBehaviorBase with ArtifactStoreAttachmentBehaviors {
+ override protected def getAttachmentStore[D <: DocumentSerializer:
ClassTag]() =
+ Some(MemoryAttachmentStoreProvider.makeStore[D]())
+}
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBStoreBehaviorBase.scala
b/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBStoreBehaviorBase.scala
new file mode 100644
index 0000000..d3dae92
--- /dev/null
+++
b/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBStoreBehaviorBase.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.mongodb
+
+import org.apache.openwhisk.core.ConfigKeys
+import
org.apache.openwhisk.core.database.test.behavior.ArtifactStoreBehaviorBase
+import org.apache.openwhisk.core.database.{ArtifactStore, AttachmentStore,
DocumentSerializer}
+import org.apache.openwhisk.core.entity._
+import org.scalatest.FlatSpec
+import pureconfig.loadConfigOrThrow
+import pureconfig.generic.auto._
+
+import scala.reflect.{classTag, ClassTag}
+import scala.util.Try
+
+trait MongoDBStoreBehaviorBase extends FlatSpec with ArtifactStoreBehaviorBase
{
+ override def storeType = "MongoDB"
+
+ override lazy val storeAvailableCheck: Try[Any] = storeConfigTry
+
+ val storeConfigTry = Try {
loadConfigOrThrow[MongoDBConfig](ConfigKeys.mongodb) }
+
+ override lazy val authStore = {
+ implicit val docReader: DocumentReader = WhiskDocumentReader
+
MongoDBArtifactStoreProvider.makeArtifactStore[WhiskAuth](storeConfigTry.get,
getAttachmentStore[WhiskAuth]())
+ }
+
+ override lazy val entityStore =
+
MongoDBArtifactStoreProvider.makeArtifactStore[WhiskEntity](storeConfigTry.get,
getAttachmentStore[WhiskEntity]())(
+ classTag[WhiskEntity],
+ WhiskEntityJsonFormat,
+ WhiskDocumentReader,
+ actorSystem,
+ logging,
+ materializer)
+
+ override lazy val activationStore = {
+ implicit val docReader: DocumentReader = WhiskDocumentReader
+ MongoDBArtifactStoreProvider
+ .makeArtifactStore[WhiskActivation](storeConfigTry.get,
getAttachmentStore[WhiskActivation]())
+ }
+
+ override protected def getAttachmentStore(store: ArtifactStore[_]) =
+ store.asInstanceOf[MongoDBArtifactStore[_]].attachmentStore
+
+ protected def getAttachmentStore[D <: DocumentSerializer: ClassTag]():
Option[AttachmentStore] = None
+}
diff --git
a/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBViewMapperTests.scala
b/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBViewMapperTests.scala
new file mode 100644
index 0000000..44b121b
--- /dev/null
+++
b/tests/src/test/scala/org/apache/openwhisk/core/database/mongodb/MongoDBViewMapperTests.scala
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.openwhisk.core.database.mongodb
+
+import org.apache.openwhisk.core.database.{UnsupportedQueryKeys,
UnsupportedView}
+import org.apache.openwhisk.core.entity.WhiskQueries.TOP
+import org.bson.conversions.Bson
+import org.junit.runner.RunWith
+import org.mongodb.scala.MongoClient
+import org.mongodb.scala.bson.BsonDocument
+import org.mongodb.scala.bson.collection.immutable.Document
+import org.mongodb.scala.model.Filters.{equal => meq, _}
+import org.mongodb.scala.model.Sorts
+import org.scalatest.{FlatSpec, Matchers, OptionValues}
+import org.scalatest.junit.JUnitRunner
+
+@RunWith(classOf[JUnitRunner])
+class MongoDBViewMapperTests extends FlatSpec with Matchers with OptionValues {
+ implicit class RichBson(val b: Bson) {
+ def toDoc: BsonDocument = b.toBsonDocument(classOf[Document],
MongoClient.DEFAULT_CODEC_REGISTRY)
+ }
+
+ behavior of "ActivationViewMapper filter"
+
+ it should "match all activations in namespace" in {
+ ActivationViewMapper.filter("whisks.v2.1.0", "activations", List("ns1"),
List("ns1", TOP)).toDoc shouldBe
+ meq("namespace", "ns1").toDoc
+ ActivationViewMapper.filter("whisks-filters.v2.1.0", "activations",
List("ns1"), List("ns1", TOP)).toDoc shouldBe
+ meq("_computed.nspath", "ns1").toDoc
+ }
+
+ it should "match all activations in namespace since zero" in {
+ ActivationViewMapper.filter("whisks.v2.1.0", "activations", List("ns1",
0), List("ns1", TOP, TOP)).toDoc shouldBe
+ and(meq("namespace", "ns1"), gte("start", 0)).toDoc
+
+ ActivationViewMapper
+ .filter("whisks-filters.v2.1.0", "activations", List("ns1", 0),
List("ns1", TOP, TOP))
+ .toDoc shouldBe
+ and(meq("_computed.nspath", "ns1"), gte("start", 0)).toDoc
+ }
+
+ it should "match all activations in namespace since some value" in {
+ ActivationViewMapper.filter("whisks.v2.1.0", "activations", List("ns1",
42), List("ns1", TOP, TOP)).toDoc shouldBe
+ and(meq("namespace", "ns1"), gte("start", 42)).toDoc
+
+ ActivationViewMapper
+ .filter("whisks-filters.v2.1.0", "activations", List("ns1", 42),
List("ns1", TOP, TOP))
+ .toDoc shouldBe
+ and(meq("_computed.nspath", "ns1"), gte("start", 42)).toDoc
+ }
+
+ it should "match all activations in namespace between 2 instants" in {
+ ActivationViewMapper.filter("whisks.v2.1.0", "activations", List("ns1",
42), List("ns1", 314, TOP)).toDoc shouldBe
+ and(meq("namespace", "ns1"), gte("start", 42), lte("start", 314)).toDoc
+
+ ActivationViewMapper
+ .filter("whisks-filters.v2.1.0", "activations", List("ns1", 42),
List("ns1", 314, TOP))
+ .toDoc shouldBe
+ and(meq("_computed.nspath", "ns1"), gte("start", 42), lte("start",
314)).toDoc
+ }
+
+ it should "throw UnsupportedQueryKeys for unknown keys" in {
+ intercept[UnsupportedQueryKeys] {
+ ActivationViewMapper.filter("whisks.v2.1.0", "activations", List("ns1"),
List("ns1", "foo"))
+ }
+ }
+
+ it should "throw UnsupportedView exception for unknown views" in {
+ intercept[UnsupportedView] {
+ ActivationViewMapper.filter("whisks.v2.1.0", "activation-foo",
List("ns1"), List("ns1", TOP))
+ }
+ }
+
+ behavior of "ActivationViewMapper sort"
+
+ it should "sort descending" in {
+ ActivationViewMapper.sort("whisks-filters.v2.1.0", "activations",
descending = true).value.toDoc shouldBe
+ Sorts.descending("_computed.nspath", "start").toDoc
+ ActivationViewMapper.sort("whisks.v2.1.0", "activations", descending =
true).value.toDoc shouldBe
+ Sorts.descending("namespace", "start").toDoc
+ }
+
+ it should "sort ascending" in {
+ ActivationViewMapper.sort("whisks-filters.v2.1.0", "activations",
descending = false).value.toDoc shouldBe
+ Sorts.ascending("_computed.nspath", "start").toDoc
+ ActivationViewMapper.sort("whisks.v2.1.0", "activations", descending =
false).value.toDoc shouldBe
+ Sorts.ascending("namespace", "start").toDoc
+ }
+
+ it should "throw UnsupportedView" in {
+ intercept[UnsupportedView] {
+ ActivationViewMapper.sort("whisks.v2.1.0", "activation-foo", descending
= true)
+ }
+ }
+
+ behavior of "WhisksViewMapper filter"
+
+ val whiskTypes = Seq(
+ ("actions", "action"),
+ ("packages", "package"),
+ ("packages-public", "package"),
+ ("rules", "rule"),
+ ("triggers", "trigger"))
+
+ it should "match entities of specific type in namespace" in {
+ whiskTypes.foreach {
+ case (view, entityType) =>
+ var filters =
+ or(
+ and(meq("entityType", entityType), meq("namespace", "ns1")),
+ and(meq("entityType", entityType), meq("_computed.rootns", "ns1")))
+ if (view == "packages-public")
+ filters = getPublicPackageFilter(filters)
+ WhisksViewMapper.filter("whisks.v2.1.0", view, List("ns1"),
List("ns1", TOP)).toDoc shouldBe filters.toDoc
+ }
+ }
+
+ it should "match entities of specific type in namespace and updated since"
in {
+ whiskTypes.foreach {
+ case (view, entityType) =>
+ var filters =
+ or(
+ and(meq("entityType", entityType), meq("namespace", "ns1"),
gte("updated", 42)),
+ and(meq("entityType", entityType), meq("_computed.rootns", "ns1"),
gte("updated", 42)))
+ if (view == "packages-public")
+ filters = getPublicPackageFilter(filters)
+ WhisksViewMapper
+ .filter("whisks.v2.1.0", view, List("ns1", 42), List("ns1", TOP,
TOP))
+ .toDoc shouldBe filters.toDoc
+ }
+ }
+
+ it should "match all entities of specific type in namespace and between" in {
+ whiskTypes.foreach {
+ case (view, entityType) =>
+ var filters =
+ or(
+ and(meq("entityType", entityType), meq("namespace", "ns1"),
gte("updated", 42), lte("updated", 314)),
+ and(meq("entityType", entityType), meq("_computed.rootns", "ns1"),
gte("updated", 42), lte("updated", 314)))
+ if (view == "packages-public")
+ filters = getPublicPackageFilter(filters)
+ WhisksViewMapper
+ .filter("whisks.v2.1.0", view, List("ns1", 42), List("ns1", 314,
TOP))
+ .toDoc shouldBe filters.toDoc
+ }
+ }
+
+ it should "match all entities in namespace" in {
+ WhisksViewMapper.filter("whisks.v2.1.0", "all", List("ns1"), List("ns1",
TOP)).toDoc shouldBe
+ and(exists("entityType"), meq("_computed.rootns", "ns1")).toDoc
+ }
+
+ it should "throw UnsupportedQueryKeys for unknown keys" in {
+ intercept[UnsupportedQueryKeys] {
+ WhisksViewMapper.filter("whisks.v2.1.0", "actions", List("ns1"),
List("ns1", "foo"))
+ }
+ intercept[UnsupportedQueryKeys] {
+ WhisksViewMapper.filter("whisks.v2.1.0", "all", List("ns1"), List("ns1",
"foo"))
+ }
+ }
+
+ it should "throw UnsupportedView exception for unknown views" in {
+ intercept[UnsupportedView] {
+ WhisksViewMapper.filter("whisks.v2.1.0", "actions-foo", List("ns1"),
List("ns1", TOP))
+ }
+ }
+
+ behavior of "WhisksViewMapper sort"
+
+ it should "sort descending" in {
+ whiskTypes.foreach {
+ case (view, _) =>
+ WhisksViewMapper.sort("whisks.v2.1.0", view, descending =
true).value.toDoc shouldBe
+ Sorts.descending("updated").toDoc
+ }
+ }
+
+ it should "sort ascending" in {
+ whiskTypes.foreach {
+ case (view, _) =>
+ WhisksViewMapper.sort("whisks.v2.1.0", view, descending =
false).value.toDoc shouldBe
+ Sorts.ascending("updated").toDoc
+ }
+ }
+
+ it should "throw UnsupportedView" in {
+ intercept[UnsupportedView] {
+ WhisksViewMapper.sort("whisks.v2.1.0", "action-foo", descending = true)
+ }
+ }
+
+ behavior of "SubjectViewMapper filter"
+
+ it should "match by subject or namespace" in {
+ SubjectViewMapper.filter("subjects", "identities", List("foo"),
List("foo")).toDoc shouldBe
+ and(notEqual("blocked", true), or(meq("subject", "foo"),
meq("namespaces.name", "foo"))).toDoc
+ }
+
+ it should "match by uuid and key" in {
+ SubjectViewMapper.filter("subjects", "identities", List("u1", "k1"),
List("u1", "k1")).toDoc shouldBe
+ and(
+ notEqual("blocked", true),
+ or(and(meq("uuid", "u1"), meq("key", "k1")),
and(meq("namespaces.uuid", "u1"), meq("namespaces.key", "k1")))).toDoc
+ }
+
+ it should "match by blocked or invocationsPerMinute or
concurrentInvocations" in {
+ SubjectViewMapper
+ .filter("namespaceThrottlings", "blockedNamespaces", List("u1", "k1"),
List("u1", "k1"))
+ .toDoc shouldBe
+ or(meq("blocked", true), meq("concurrentInvocations", 0),
meq("invocationsPerMinute", 0)).toDoc
+ }
+
+ it should "throw exception when keys are not same" in {
+ intercept[IllegalArgumentException] {
+ SubjectViewMapper.filter("subjects", "identities", List("u1", "k1"),
List("u1", "k2"))
+ }
+ }
+
+ it should "throw UnsupportedQueryKeys exception when keys are not know" in {
+ intercept[UnsupportedQueryKeys] {
+ SubjectViewMapper.filter("subjects", "identities", List("u1", "k1",
"foo"), List("u1", "k1", "foo"))
+ }
+ }
+
+ it should "throw UnsupportedView exception when view is not known" in {
+ intercept[UnsupportedView] {
+ SubjectViewMapper.filter("subjects", "identities-foo", List("u1", "k1",
"foo"), List("u1", "k1", "foo"))
+ }
+ }
+
+ behavior of "SubjectViewMapper sort"
+
+ it should "sort none" in {
+ SubjectViewMapper.sort("subjects", "identities", descending = true)
shouldBe None
+ SubjectViewMapper.sort("namespaceThrottlings", "blockedNamespaces",
descending = true) shouldBe None
+ }
+
+ private def getPublicPackageFilter(filters: Bson): Bson = {
+ and(meq("binding", Map.empty), meq("publish", true), filters)
+ }
+}
diff --git a/tools/build/README.md b/tools/build/README.md
index 0282482..ce5590f 100644
--- a/tools/build/README.md
+++ b/tools/build/README.md
@@ -31,6 +31,7 @@ The script is called `redo` because for most development, one
will want to "redo
- initialize environment and `docker-machine` (for mac): `redo setup prereq`
- start CouchDB container and initialize DB with system and guest keys: `redo
couchdb initdb`
- start ElasticSearch container to store activations: `redo elasticsearch`
+- start MongoDB container to as database backend: `redo mongodb`
- build and deploy system: `redo deploy`
- run tests: `redo props tests`
diff --git a/tools/build/redo b/tools/build/redo
index 9ffa04e..b149c1a 100755
--- a/tools/build/redo
+++ b/tools/build/redo
@@ -241,6 +241,13 @@ Components = [
'deploy elasticsearch',
modes = 'clean'),
+ makeComponent('mongodb',
+ 'deploy mongodb',
+ modes = 'clean'),
+
+ makeComponent('initMongoDB',
+ 'initialize mongodb with guest/system keys'),
+
makeComponent('build',
'build system',
yaml = False,