Repository: incubator-ariatosca Updated Branches: refs/heads/SQLAlchemy-based-models 9eb95924c -> 49401998c (forced update)
unified APIs, added system model tests for filesystem MAPI and sql MAPI Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/49401998 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/49401998 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/49401998 Branch: refs/heads/SQLAlchemy-based-models Commit: 49401998c0d69ea68b048b7475bad4ffa5511bc1 Parents: 1bb1f6b Author: mxmrlv <[email protected]> Authored: Mon Nov 28 17:47:22 2016 +0200 Committer: mxmrlv <[email protected]> Committed: Mon Nov 28 18:05:21 2016 +0200 ---------------------------------------------------------------------- aria/__init__.py | 4 +- aria/orchestrator/context/common.py | 2 +- aria/storage/__init__.py | 26 +- aria/storage/api.py | 101 ++++++ aria/storage/api_driver/__init__.py | 4 +- aria/storage/api_driver/file_system.py | 229 ------------- aria/storage/api_driver/filesystem.py | 235 +++++++++++++ aria/storage/api_driver/in_memory.py | 146 --------- aria/storage/api_driver/inmemory.py | 143 ++++++++ aria/storage/api_driver/sql.py | 328 ++++++++++++++++++- aria/storage/models.py | 33 +- aria/storage/storage_api.py | 94 ------ aria/storage/structures.py | 23 +- aria/storage/testing.py | 19 ++ tests/mock/context.py | 5 +- tests/orchestrator/context/test_workflow.py | 2 +- .../orchestrator/workflows/builtin/__init__.py | 2 + .../orchestrator/workflows/core/test_engine.py | 2 +- tests/storage/test_model_storage.py | 48 ++- 19 files changed, 908 insertions(+), 538 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/49401998/aria/__init__.py ---------------------------------------------------------------------- diff --git a/aria/__init__.py b/aria/__init__.py index 4dd23c9..45e74df 100644 --- a/aria/__init__.py +++ b/aria/__init__.py @@ -58,14 +58,14 @@ def install_aria_extensions(): del sys.modules[module_name] -def application_model_storage(api): +def application_model_storage(api, api_params=None): """ Initiate model storage for the supplied storage driver """ if api not in _model_storage: _model_storage[api] = storage.ModelStorage( - api, items=[ + api, api_params=api_params or {}, items=[ storage.models.Node, storage.models.NodeInstance, storage.models.Plugin, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/49401998/aria/orchestrator/context/common.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py index f2bf83b..7b65e2b 100644 --- a/aria/orchestrator/context/common.py +++ b/aria/orchestrator/context/common.py @@ -79,7 +79,7 @@ class BaseContext(logger.LoggerMixin): """ The blueprint model """ - return self.model.blueprint.get(self.deployment.blueprint_id) + return self.deployment.blueprint @property def deployment(self): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/49401998/aria/storage/__init__.py ---------------------------------------------------------------------- diff --git a/aria/storage/__init__.py b/aria/storage/__init__.py index 9cb6c77..d0c555d 100644 --- a/aria/storage/__init__.py +++ b/aria/storage/__init__.py @@ -39,9 +39,10 @@ API: """ from aria.logger import LoggerMixin -from aria.storage import storage_api +from aria.storage import api from aria.storage.exceptions import StorageError -from . import models, exceptions, storage_api, structures +from .structures import db +from . import models, exceptions, api, structures __all__ = ( @@ -75,22 +76,25 @@ class Storage(LoggerMixin): return super(Storage, self).__getattribute__(item) def register(self, name): - raise NotImplementedError("BBBBBBB") + raise NotImplementedError('Subclass must implement abstract register method') class ResourceStorage(Storage): def register(self, name): - self.registered[name] = self.api(name=name, - **self._api_params) + self.registered[name] = self.api(name=name, **self._api_params) self.registered[name].create() self.logger.debug('setup {name} in storage {self!r}'.format(name=name, self=self)) class ModelStorage(Storage): def register(self, model): - model_name = storage_api.generate_lower_name(model) - self.registered[model_name] = self.api(name=model_name, - model_cls=model, - **self._api_params) - self.registered[model_name].create() - self.logger.debug('setup {name} in storage {self!r}'.format(name=model_name, self=self)) + model_name = api.generate_lower_name(model) + if model_name not in self.registered: + self.registered[model_name] = self.api(name=model_name, + model_cls=model, + **self._api_params) + self.registered[model_name].create() + self.logger.debug('setup {name} in storage {self!r}'.format(name=model_name, self=self)) + else: + self.logger.debug('{name} in already storage {self!r}'.format(name=model_name, self=self)) + http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/49401998/aria/storage/api.py ---------------------------------------------------------------------- diff --git a/aria/storage/api.py b/aria/storage/api.py new file mode 100644 index 0000000..8fee34a --- /dev/null +++ b/aria/storage/api.py @@ -0,0 +1,101 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from contextlib import contextmanager + +from . import exceptions + + +class StorageAPI(object): + def create(self, **kwargs): + raise NotImplementedError('Subclass must implement abstract create method') + + @contextmanager + def connect(self): + try: + self._establish_connection() + yield self + except BaseException as e: + raise exceptions.StorageError(str(e)) + finally: + self._destroy_connection() + + def _establish_connection(self): + pass + + def _destroy_connection(self): + pass + + +class ModelAPI(StorageAPI): + def __init__(self, model_cls, name=None, **kwargs): + super(ModelAPI, self).__init__(**kwargs) + self._model_cls = model_cls + self._name = name or generate_lower_name(model_cls) + + @property + def name(self): + return self._name + + @property + def model_cls(self): + return self._model_cls + + def get(self, entry_id, **kwargs): + raise NotImplementedError('Subclass must implement abstract get method') + + def store(self, entry, **kwargs): + raise NotImplementedError('Subclass must implement abstract store method') + + def delete(self, entry_id, **kwargs): + raise NotImplementedError('Subclass must implement abstract delete method') + + def __iter__(self): + return self.iter() + + def iter(self, **kwargs): + raise NotImplementedError('Subclass must implement abstract iter method') + + def update(self, entry_id, **kwargs): + raise NotImplementedError('Subclass must implement abstract update method') + + +class ResourceAPI(StorageAPI): + def __init__(self, name): + self._name = name + + @property + def name(self): + return self._name + + def data(self, entry_id, path=None, **kwargs): + raise NotImplementedError('Subclass must implement abstract data method') + + def download(self, entry_id, destination, path=None, **kwargs): + raise NotImplementedError('Subclass must implement abstract download method') + + def upload(self, entry_id, source, path=None, **kwargs): + raise NotImplementedError('Subclass must implement abstract upload method') + + +def generate_lower_name(model_cls): + """ + Generates the name of the class from the class object. e.g. SomeClass -> some_class + :param model_cls: the class to evaluate. + :return: lower name + :rtype: basestring + """ + return ''.join( + character if character.islower() else '_{0}'.format(character.lower()) + for character in model_cls.__name__)[1:] http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/49401998/aria/storage/api_driver/__init__.py ---------------------------------------------------------------------- diff --git a/aria/storage/api_driver/__init__.py b/aria/storage/api_driver/__init__.py index 9d0095b..127426a 100644 --- a/aria/storage/api_driver/__init__.py +++ b/aria/storage/api_driver/__init__.py @@ -13,6 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -from .file_system import FileSystemModelAPI, FileSystemResourceAPI -from .in_memory import InMemoryModelAPI +from .filesystem import FileSystemModelAPI, FileSystemResourceAPI +from .inmemory import InMemoryModelAPI from .sql import SQLAlchemyModelAPI http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/49401998/aria/storage/api_driver/file_system.py ---------------------------------------------------------------------- diff --git a/aria/storage/api_driver/file_system.py b/aria/storage/api_driver/file_system.py deleted file mode 100644 index 45ecbe8..0000000 --- a/aria/storage/api_driver/file_system.py +++ /dev/null @@ -1,229 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import os -import shutil -from functools import partial -from multiprocessing import RLock -from distutils import dir_util - -import jsonpickle - -from .. import ( - storage_api, - StorageError -) - - -# TODO: fix docs - - -class FileSystemAPI(storage_api.StorageAPI): - """ - Base class which handles storage on the file system. - """ - def __init__(self, *args, **kwargs): - super(FileSystemAPI, self).__init__(*args, **kwargs) - self._lock = RLock() - - def _establish_connection(self): - self._lock.acquire() - - def _destroy_connection(self): - self._lock.release() - - def __getstate__(self): - obj_dict = super(FileSystemAPI, self).__getstate__() - del obj_dict['_lock'] - return obj_dict - - def __setstate__(self, obj_dict): - super(FileSystemAPI, self).__setstate__(obj_dict) - vars(self).update(_lock=RLock(), **obj_dict) - - -class FileSystemModelAPI(storage_api.ModelAPI, FileSystemAPI): - """ - File system model storage. - """ - - def __init__(self, directory, **kwargs): - """ - File system implementation for storage api. - :param str directory: root dir for storage. - """ - super(FileSystemModelAPI, self).__init__(**kwargs) - self.directory = directory - - self._join_path = partial(os.path.join, self.directory) - - def __repr__(self): - return '{cls.__name__}(directory={self.directory})'.format( - cls=self.__class__, self=self) - - def create(self, **kwargs): - """ - Create directory in storage by path. - tries to create the root directory as well. - :param str name: path of file in storage. - """ - try: - os.makedirs(self.directory) - except (OSError, IOError): - pass - os.makedirs(self._join_path(self.name)) - - def get(self, entry_id, **kwargs): - """ - Getter from storage. - :param str name: name of directory in storage. - :param str entry_id: id of the file to get from storage. - :return: value of file from storage. - :rtype: dict - """ - with open(self._join_path(self.name, entry_id)) as file_obj: - return jsonpickle.loads(file_obj.read()) - - def store(self, entry, **kwargs): - """ - Delete from storage. - :param Model entry: name of directory in storage. - """ - with open(self._join_path(self.name, entry.id), 'w') as file_obj: - file_obj.write(jsonpickle.dumps(entry)) - - def delete(self, entry_id, **kwargs): - """ - Delete from storage. - :param str name: name of directory in storage. - :param str entry_id: id of the file to delete from storage. - """ - os.remove(self._join_path(self.name, entry_id)) - - def iter(self, filters=None, **kwargs): - """ - Generator over the entries of directory in storage. - :param dict filters: filters for query - """ - filters = filters or {} - - for entry_id in os.listdir(self._join_path(self.name)): - value = self.get(self.name, entry_id=entry_id) - for filter_name, filter_value in filters.items(): - if value.get(filter_name) != filter_value: - break - else: - yield value - - def update(self, entry_id, **kwargs): - """ - Updates and entry in storage. - - :param str name: name of table/document in storage. - :param str entry_id: id of the document to get from storage. - :param kwargs: the fields to update. - :return: - """ - entry_dict = self.get(entry_id) - entry_dict.update(**kwargs) - self.store(entry_dict) - - -class FileSystemResourceAPI(storage_api.ResourceAPI, FileSystemAPI): - """ - File system resource storage. - """ - - def __init__(self, directory, **kwargs): - """ - File system implementation for storage api. - :param str directory: root dir for storage. - """ - super(FileSystemResourceAPI, self).__init__(**kwargs) - self.directory = directory - self._join_path = partial(os.path.join, self.directory) - - def __repr__(self): - return '{cls.__name__}(directory={self.directory})'.format( - cls=self.__class__, self=self) - - def create(self, **kwargs): - """ - Create directory in storage by path. - tries to create the root directory as well. - :param str name: path of file in storage. - """ - try: - os.makedirs(self.directory) - except (OSError, IOError): - pass - os.makedirs(self._join_path(self.name)) - - def data(self, entry_id, path=None, **_): - """ - Retrieve the content of a file system storage resource. - - :param str entry_type: the type of the entry. - :param str entry_id: the id of the entry. - :param str path: a path to a specific resource. - :return: the content of the file - :rtype: bytes - """ - resource_relative_path = os.path.join(self.name, entry_id, path or '') - resource = os.path.join(self.directory, resource_relative_path) - if not os.path.exists(resource): - raise StorageError("Resource {0} does not exist".format(resource_relative_path)) - if not os.path.isfile(resource): - resources = os.listdir(resource) - if len(resources) != 1: - raise StorageError('No resource in path: {0}'.format(resource)) - resource = os.path.join(resource, resources[0]) - with open(resource, 'rb') as resource_file: - return resource_file.read() - - def download(self, entry_id, destination, path=None, **_): - """ - Download a specific file or dir from the file system resource storage. - - :param str entry_type: the name of the entry. - :param str entry_id: the id of the entry - :param str destination: the destination of the files. - :param str path: a path on the remote machine relative to the root of the entry. - """ - resource_relative_path = os.path.join(self.name, entry_id, path or '') - resource = os.path.join(self.directory, resource_relative_path) - if not os.path.exists(resource): - raise StorageError("Resource {0} does not exist".format(resource_relative_path)) - if os.path.isfile(resource): - shutil.copy2(resource, destination) - else: - dir_util.copy_tree(resource, destination) # pylint: disable=no-member - - def upload(self, entry_id, source, path=None, **_): - """ - Uploads a specific file or dir to the file system resource storage. - - :param str entry_type: the name of the entry. - :param str entry_id: the id of the entry - :param source: the source of the files to upload. - :param path: the destination of the file/s relative to the entry root dir. - """ - resource_directory = os.path.join(self.directory, self.name, entry_id) - if not os.path.exists(resource_directory): - os.makedirs(resource_directory) - destination = os.path.join(resource_directory, path or '') - if os.path.isfile(source): - shutil.copy2(source, destination) - else: - dir_util.copy_tree(source, destination) # pylint: disable=no-member http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/49401998/aria/storage/api_driver/filesystem.py ---------------------------------------------------------------------- diff --git a/aria/storage/api_driver/filesystem.py b/aria/storage/api_driver/filesystem.py new file mode 100644 index 0000000..6ea2222 --- /dev/null +++ b/aria/storage/api_driver/filesystem.py @@ -0,0 +1,235 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os +import shutil +from functools import partial +from multiprocessing import RLock +from distutils import dir_util + +import jsonpickle + +from .. import ( + api, + StorageError +) + + +# TODO: fix docs + + +class FileSystemAPI(api.StorageAPI): + """ + Base class which handles storage on the file system. + """ + def __init__(self, *args, **kwargs): + super(FileSystemAPI, self).__init__(*args, **kwargs) + self._lock = RLock() + + def _establish_connection(self): + self._lock.acquire() + + def _destroy_connection(self): + self._lock.release() + + def __getstate__(self): + obj_dict = super(FileSystemAPI, self).__getstate__() + del obj_dict['_lock'] + return obj_dict + + def __setstate__(self, obj_dict): + super(FileSystemAPI, self).__setstate__(obj_dict) + vars(self).update(_lock=RLock(), **obj_dict) + + +class FileSystemModelAPI(api.ModelAPI, FileSystemAPI): + """ + File system model storage. + """ + + def __init__(self, directory, **kwargs): + """ + File system implementation for storage api. + :param str directory: root dir for storage. + """ + super(FileSystemModelAPI, self).__init__(**kwargs) + self.directory = directory + self.base_path = os.path.join(self.directory, self.name) + self._join_path = partial(os.path.join, self.base_path) + + def __repr__(self): + return '{cls.__name__}(directory={self.directory})'.format( + cls=self.__class__, self=self) + + def create(self, **kwargs): + """ + Create directory in storage by path. + tries to create the root directory as well. + :param str name: path of file in storage. + """ + with self.connect(): + try: + os.makedirs(self.directory) + except (OSError, IOError): + pass + os.makedirs(self.base_path) + + def get(self, entry_id, **kwargs): + """ + Getter from storage. + :param str entry_id: id of the file to get from storage. + :return: value of file from storage. + :rtype: dict + """ + with self.connect(): + with open(self._join_path(entry_id)) as file_obj: + return jsonpickle.loads(file_obj.read()) + + def store(self, entry, **kwargs): + """ + Delete from storage. + :param Model entry: name of directory in storage. + """ + with self.connect(): + with open(self._join_path(entry.id), 'w') as file_obj: + file_obj.write(jsonpickle.dumps(entry)) + + def delete(self, entry_id, **kwargs): + """ + Delete from storage. + :param str name: name of directory in storage. + :param str entry_id: id of the file to delete from storage. + """ + with self.connect(): + os.remove(self._join_path(entry_id)) + + def iter(self, filters=None, **kwargs): + """ + Generator over the entries of directory in storage. + :param dict filters: filters for query + """ + filters = filters or {} + with self.connect(): + for entry_id in os.listdir(self.base_path): + value = self.get(entry_id=entry_id) + for filter_name, filter_value in filters.items(): + if value.get(filter_name) != filter_value: + break + else: + yield value + + def update(self, entry_id, **kwargs): + """ + Updates and entry in storage. + + :param str name: name of table/document in storage. + :param str entry_id: id of the document to get from storage. + :param kwargs: the fields to update. + :return: + """ + with self.connect(): + entry = self.get(entry_id) + for key, value in kwargs.items(): + setattr(entry, key, value) + self.store(entry) + + +class FileSystemResourceAPI(api.ResourceAPI, FileSystemAPI): + """ + File system resource storage. + """ + + def __init__(self, directory, **kwargs): + """ + File system implementation for storage api. + :param str directory: root dir for storage. + """ + super(FileSystemResourceAPI, self).__init__(**kwargs) + self.directory = directory + self.base_path = os.path.join(self.directory, self.name) + self._join_path = partial(os.path.join, self.base_path) + + def __repr__(self): + return '{cls.__name__}(directory={self.directory})'.format( + cls=self.__class__, self=self) + + def create(self, **kwargs): + """ + Create directory in storage by path. + tries to create the root directory as well. + :param str name: path of file in storage. + """ + try: + os.makedirs(self.directory) + except (OSError, IOError): + pass + os.makedirs(self.base_path) + + def data(self, entry_id, path=None, **_): + """ + Retrieve the content of a file system storage resource. + + :param str entry_type: the type of the entry. + :param str entry_id: the id of the entry. + :param str path: a path to a specific resource. + :return: the content of the file + :rtype: bytes + """ + resource_relative_path = os.path.join(self.name, entry_id, path or '') + resource = os.path.join(self.directory, resource_relative_path) + if not os.path.exists(resource): + raise StorageError("Resource {0} does not exist".format(resource_relative_path)) + if not os.path.isfile(resource): + resources = os.listdir(resource) + if len(resources) != 1: + raise StorageError('No resource in path: {0}'.format(resource)) + resource = os.path.join(resource, resources[0]) + with open(resource, 'rb') as resource_file: + return resource_file.read() + + def download(self, entry_id, destination, path=None, **_): + """ + Download a specific file or dir from the file system resource storage. + + :param str entry_type: the name of the entry. + :param str entry_id: the id of the entry + :param str destination: the destination of the files. + :param str path: a path on the remote machine relative to the root of the entry. + """ + resource_relative_path = os.path.join(self.name, entry_id, path or '') + resource = os.path.join(self.directory, resource_relative_path) + if not os.path.exists(resource): + raise StorageError("Resource {0} does not exist".format(resource_relative_path)) + if os.path.isfile(resource): + shutil.copy2(resource, destination) + else: + dir_util.copy_tree(resource, destination) # pylint: disable=no-member + + def upload(self, entry_id, source, path=None, **_): + """ + Uploads a specific file or dir to the file system resource storage. + + :param str entry_type: the name of the entry. + :param str entry_id: the id of the entry + :param source: the source of the files to upload. + :param path: the destination of the file/s relative to the entry root dir. + """ + resource_directory = os.path.join(self.directory, self.name, entry_id) + if not os.path.exists(resource_directory): + os.makedirs(resource_directory) + destination = os.path.join(resource_directory, path or '') + if os.path.isfile(source): + shutil.copy2(source, destination) + else: + dir_util.copy_tree(source, destination) # pylint: disable=no-member http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/49401998/aria/storage/api_driver/in_memory.py ---------------------------------------------------------------------- diff --git a/aria/storage/api_driver/in_memory.py b/aria/storage/api_driver/in_memory.py deleted file mode 100644 index b2d76f1..0000000 --- a/aria/storage/api_driver/in_memory.py +++ /dev/null @@ -1,146 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from collections import namedtuple - -from aria.storage.structures import db - -from .. import storage_api - - -_Pointer = namedtuple('_Pointer', 'name, is_iter') - -storage = {} - - -class InMemoryModelAPI(storage_api.ModelAPI): - def __init__(self, *args, **kwargs): - """ - Managing the model in the storage, using the driver. - - :param basestring name: the name of the model. - :param ModelDriver driver: the driver which supports this model in the storage. - :param Model model_cls: table/document class model. - """ - super(InMemoryModelAPI, self).__init__(*args, **kwargs) - self.pointer_mapping = {} - - def __iter__(self): - return self.iter() - - def create(self): - """ - Creates the model in the storage. - """ - with self.connect(): - storage[self.name] = {} - self._setup_pointers_mapping() - - def _setup_pointers_mapping(self,): - for field_name, field_cls in vars(self.model_cls).items(): - if not(isinstance(field_cls, db.RelationshipProperty) and field_cls.type): - continue - pointer_key = _Pointer(field_name, is_iter=False) - self.pointer_mapping[pointer_key] = self.__class__( - name=storage_api.generate_lower_name(field_cls.type), - model_cls=field_cls.type) - - def get(self, entry_id, **kwargs): - """ - Getter for the model from the storage. - - :param basestring entry_id: the id of the table/document. - :return: model instance - :rtype: Model - """ - with self.connect(): - data = storage[self.name][entry_id] - data.update(self._get_pointers(data, **kwargs)) - return self.model_cls(**data) - - def store(self, entry, **kwargs): - """ - Setter for the model in the storage. - - :param Model entry: the table/document to store. - """ - with self.connect(): - data = entry.fields_dict - data.update(self._store_pointers(data, **kwargs)) - storage[self.name][entry.id] = data - - def delete(self, entry_id, **kwargs): - """ - Delete the model from storage. - - :param basestring entry_id: id of the entity to delete from storage. - """ - entry = self.get(entry_id) - with self.connect(): - self._delete_pointers(entry, **kwargs) - storage[self.name].pop(entry_id) - - def iter(self, **kwargs): - """ - Generator over the entries of model in storage. - """ - with self.connect(): - for data in storage[self.name].values(): - data.update(self._get_pointers(data, **kwargs)) - yield self.model_cls(**data) - - def update(self, entry_id, **kwargs): - """ - Updates and entry in storage. - - :param str entry_id: the id of the table/document. - :param kwargs: the fields to update. - :return: - """ - with self.connect(): - storage[self.name][entry_id].update(**kwargs) - - def _get_pointers(self, data, **kwargs): - pointers = {} - for field, schema in self.pointer_mapping.items(): - if field.is_iter: - pointers[field.name] = [ - schema.get(entry_id=pointer_id, **kwargs) - for pointer_id in data[field.name] - if pointer_id] - elif data[field.name]: - pointers[field.name] = schema.get(entry_id=data[field.name], **kwargs) - return pointers - - def _store_pointers(self, data, **kwargs): - pointers = {} - for field, model_api in self.pointer_mapping.items(): - if field.is_iter: - pointers[field.name] = [] - for iter_entity in data[field.name]: - pointers[field.name].append(iter_entity.id) - model_api.store(iter_entity, **kwargs) - else: - pointers[field.name] = data[field.name].id - model_api.store(data[field.name], **kwargs) - return pointers - - def _delete_pointers(self, entry, **kwargs): - for field, schema in self.pointer_mapping.items(): - if field.is_iter: - for iter_entry in getattr(entry, field.name): - schema.delete(iter_entry.id, **kwargs) - else: - schema.delete(getattr(entry, field.name).id, **kwargs) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/49401998/aria/storage/api_driver/inmemory.py ---------------------------------------------------------------------- diff --git a/aria/storage/api_driver/inmemory.py b/aria/storage/api_driver/inmemory.py new file mode 100644 index 0000000..1bd438d --- /dev/null +++ b/aria/storage/api_driver/inmemory.py @@ -0,0 +1,143 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from collections import namedtuple + +from aria.storage.structures import db + +from .. import api + + +_Pointer = namedtuple('_Pointer', 'name, is_iter') + +storage = {} + + +class InMemoryModelAPI(api.ModelAPI): + def __init__(self, *args, **kwargs): + """ + Managing the model in the storage, using the driver. + + :param basestring name: the name of the model. + :param ModelDriver driver: the driver which supports this model in the storage. + :param Model model_cls: table/document class model. + """ + super(InMemoryModelAPI, self).__init__(*args, **kwargs) + self.pointer_mapping = {} + + def create(self): + """ + Creates the model in the storage. + """ + with self.connect(): + storage[self.name] = {} + self._setup_pointers_mapping() + + def _setup_pointers_mapping(self,): + for field_name, field_cls in vars(self.model_cls).items(): + if not(isinstance(field_cls, db.RelationshipProperty) and field_cls.type): + continue + pointer_key = _Pointer(field_name, is_iter=False) + self.pointer_mapping[pointer_key] = self.__class__( + name=api.generate_lower_name(field_cls.type), + model_cls=field_cls.type) + + def get(self, entry_id, **kwargs): + """ + Getter for the model from the storage. + + :param basestring entry_id: the id of the table/document. + :return: model instance + :rtype: Model + """ + with self.connect(): + data = storage[self.name][entry_id] + data.update(self._get_pointers(data, **kwargs)) + return self.model_cls(**data) + + def store(self, entry, **kwargs): + """ + Setter for the model in the storage. + + :param Model entry: the table/document to store. + """ + with self.connect(): + data = entry.fields_dict + data.update(self._store_pointers(data, **kwargs)) + storage[self.name][entry.id] = data + + def delete(self, entry_id, **kwargs): + """ + Delete the model from storage. + + :param basestring entry_id: id of the entity to delete from storage. + """ + entry = self.get(entry_id) + with self.connect(): + self._delete_pointers(entry, **kwargs) + storage[self.name].pop(entry_id) + + def iter(self, **kwargs): + """ + Generator over the entries of model in storage. + """ + with self.connect(): + for data in storage[self.name].values(): + data.update(self._get_pointers(data, **kwargs)) + yield self.model_cls(**data) + + def update(self, entry_id, **kwargs): + """ + Updates and entry in storage. + + :param str entry_id: the id of the table/document. + :param kwargs: the fields to update. + :return: + """ + with self.connect(): + storage[self.name][entry_id].update(**kwargs) + + def _get_pointers(self, data, **kwargs): + pointers = {} + for field, schema in self.pointer_mapping.items(): + if field.is_iter: + pointers[field.name] = [ + schema.get(entry_id=pointer_id, **kwargs) + for pointer_id in data[field.name] + if pointer_id] + elif data[field.name]: + pointers[field.name] = schema.get(entry_id=data[field.name], **kwargs) + return pointers + + def _store_pointers(self, data, **kwargs): + pointers = {} + for field, model_api in self.pointer_mapping.items(): + if field.is_iter: + pointers[field.name] = [] + for iter_entity in data[field.name]: + pointers[field.name].append(iter_entity.id) + model_api.store(iter_entity, **kwargs) + else: + pointers[field.name] = data[field.name].id + model_api.store(data[field.name], **kwargs) + return pointers + + def _delete_pointers(self, entry, **kwargs): + for field, schema in self.pointer_mapping.items(): + if field.is_iter: + for iter_entry in getattr(entry, field.name): + schema.delete(iter_entry.id, **kwargs) + else: + schema.delete(getattr(entry, field.name).id, **kwargs) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/49401998/aria/storage/api_driver/sql.py ---------------------------------------------------------------------- diff --git a/aria/storage/api_driver/sql.py b/aria/storage/api_driver/sql.py index f9bc4fe..ef11eb1 100644 --- a/aria/storage/api_driver/sql.py +++ b/aria/storage/api_driver/sql.py @@ -12,33 +12,333 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from .. import storage_api +from collections import OrderedDict +from sqlite3 import DatabaseError as SQLiteDBError +from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy.sql.elements import Label -class SQLAlchemyModelAPI(storage_api.ModelAPI): - def _destroy_connection(self): - pass +try: + from psycopg2 import DatabaseError as Psycopg2DBError + sql_errors = (SQLAlchemyError, SQLiteDBError, Psycopg2DBError) +except ImportError: + sql_errors = (SQLAlchemyError, SQLiteDBError) + Psycopg2DBError = None - def _establish_connection(self): - pass +from ... import storage + + +class SQLAlchemyModelAPI(storage.api.ModelAPI): + + def get(self, entry_id, include=None, filters=None, locking=False): + """Return a single result based on the model class and element ID + """ + filters = filters or {'id': entry_id} + query = self._get_query(self.model_cls, include, filters) + if locking: + query = query.with_for_update() + result = query.first() + + if not result: + raise storage.exceptions.StorageError( + 'Requested {0} with ID `{1}` was not found' + .format(self.model_cls.__name__, entry_id) + ) + return result + + def list(self, + include=None, + filters=None, + pagination=None, + sort=None): + """Return a (possibly empty) list of `model_class` results + """ + query = self._get_query(include, filters, sort) + + results, total, size, offset = self._paginate(query, pagination) + + return iter(results) def store(self, entry, **kwargs): - pass + """Create a `model_class` instance from a serializable `model` object - def create(self): - pass + :param entry: A dict with relevant kwargs, or an instance of a class + that has a `to_dict` method, and whose attributes match the columns + of `model_class` (might also my just an instance of `model_class`) + :return: An instance of `model_class` + """ + self.update(entry) + return entry + def delete(self, entry_id, filters=None): + """Delete a single result based on the model class and element ID + """ + try: + instance = self.get( + entry_id, + filters=filters + ) + except storage.exceptions.StorageError: + raise storage.exceptions.StorageError( + 'Could not delete {0} with ID `{1}` - element not found' + .format( + self.model_cls.__name__, + entry_id + ) + ) + self._load_properties(instance) + storage.db.session.delete(instance) + self._safe_commit() + return instance + + # TODO: this might need rework def update(self, entry_id, **kwargs): - pass + """Add `instance` to the DB session, and attempt to commit - def delete(self, entry_id, **kwargs): - pass + :param entry_id: Instance to be updated in the DB + :return: The updated instance + """ + storage.db.session.add(entry_id) + self._safe_commit() + return entry_id + + def refresh(self, entry): + """Reload the instance with fresh information from the DB - def iter(self, **kwargs): + :param entry: Instance to be re-loaded from the DB + :return: The refreshed instance + """ + storage.db.session.refresh(entry) + self._load_properties(entry) + return entry + + def _destroy_connection(self): pass - def get(self, entry_id, **kwargs): + def _establish_connection(self): pass + def create(self): + # TODO: This should be reworked + storage.db.create_all() + + @staticmethod + def _safe_commit(): + """Try to commit changes in the session. Roll back if exception raised + Excepts SQLAlchemy errors and rollbacks if they're caught + """ + try: + storage.db.session.commit() + except sql_errors as e: + storage.db.session.rollback() + raise storage.exceptions.StorageError( + 'SQL Storage error: {0}'.format(str(e)) + ) + + def _get_base_query(self, include, joins): + """Create the initial query from the model class and included columns + + :param include: A (possibly empty) list of columns to include in + the query + :param joins: A (possibly empty) list of models on which the query + should join + :return: An SQLAlchemy AppenderQuery object + """ + + # If only some columns are included, query through the session object + if include: + query = storage.db.session.query(*include) + else: + # If all columns should be returned, query directly from the model + query = self.model_cls.query + + # Add any joins that might be necessary + for join_model in joins: + query = query.join(join_model) + + return query + + @staticmethod + def _sort_query(query, sort=None): + """Add sorting clauses to the query + + :param query: Base SQL query + :param sort: An optional dictionary where keys are column names to + sort by, and values are the order (asc/desc) + :return: An SQLAlchemy AppenderQuery object + """ + if sort: + for column, order in sort.items(): + if order == 'desc': + column = column.desc() + query = query.order_by(column) + return query + + @staticmethod + def _filter_query(query, filters): + """Add filter clauses to the query + + :param query: Base SQL query + :param filters: An optional dictionary where keys are column names to + filter by, and values are values applicable for those columns (or lists + of such values) + :return: An SQLAlchemy AppenderQuery object + """ + for column, value in filters.items(): + # If there are multiple values, use `in_`, otherwise, use `eq` + if isinstance(value, (list, tuple)): + query = query.filter(column.in_(value)) + else: + query = query.filter(column == value) + + return query + + def _get_query(self, + include=None, + filters=None, + sort=None): + """Get an SQL query object based on the params passed + + :param include: An optional list of columns to include in the query + :param filters: An optional dictionary where keys are column names to + filter by, and values are values applicable for those columns (or lists + of such values) + :param sort: An optional dictionary where keys are column names to + sort by, and values are the order (asc/desc) + :return: A sorted and filtered query with only the relevant + columns + """ + + include = include or [] + filters = filters or dict() + sort = sort or OrderedDict() + + joins = self._get_join_models_list(include, filters, sort) + include, filters, sort = self._get_columns_from_field_names( + include, filters, sort + ) + + query = self._get_base_query(include, joins) + query = self._filter_query(query, filters) + query = self._sort_query(query, sort) + return query + + def _get_columns_from_field_names(self, + include, + filters, + sort): + """Go over the optional parameters (include, filters, sort), and + replace column names with actual SQLA column objects + """ + all_includes = [self._get_column(c) for c in include] + include = [] + # Columns that are inferred from properties (Labels) should be included + # last for the following joins to work properly + for col in all_includes: + if isinstance(col, Label): + include.append(col) + else: + include.insert(0, col) + + filters = {self._get_column(c): filters[c] for c in filters} + sort = OrderedDict((self._get_column(c), sort[c]) for c in sort) + + return include, filters, sort + + def _get_join_models_list(self, include, filters, sort): + """Return a list of models on which the query should be joined, as + inferred from the include, filter and sort column names + """ + if not self.model_cls.is_resource: + return [] + + all_column_names = include + filters.keys() + sort.keys() + join_columns = {column_name for column_name in all_column_names + if self._is_join_column(column_name)} + + # If the only columns included are the columns on which we would + # normally join, there isn't actually a need to join, as the FROM + # clause in the query will be generated from the relevant models anyway + if include == list(join_columns): + return [] + + # Initializing a set, because the same model can appear in several + # join lists + join_models = set() + for column_name in join_columns: + join_models.update( + self.model_cls.join_properties[column_name]['models'] + ) + # Sort the models by their correct join order + join_models = sorted(join_models, + key=lambda model: model.join_order, reverse=True) + + return join_models + + def _is_join_column(self, column_name): + """Return False if the column name corresponds to a regular SQLA + column that `model_class` has. + Return True if the column that should be used is a join column (see + SQLModelBase for an explanation) + """ + return self.model_cls.is_resource and \ + column_name in self.model_cls.join_properties + + def _get_column(self, column_name): + """Return the column on which an action (filtering, sorting, etc.) + would need to be performed. Can be either an attribute of the class, + or needs to be inferred from the class' `join_properties` property + """ + if self._is_join_column(column_name): + return self.model_cls.join_properties[column_name]['column'] + else: + return getattr(self.model_cls, column_name) + + @staticmethod + def _paginate(query, pagination): + """Paginate the query by size and offset + + :param query: Current SQLAlchemy query object + :param pagination: An optional dict with size and offset keys + :return: A tuple with four elements: + - results: `size` items starting from `offset` + - the total count of items + - `size` [default: 0] + - `offset` [default: 0] + """ + if pagination: + size = pagination.get('size', 0) + offset = pagination.get('offset', 0) + total = query.order_by(None).count() # Fastest way to count + results = query.limit(size).offset(offset).all() + return results, total, size, offset + else: + results = query.all() + return results, len(results), 0, 0 + + @staticmethod + def _load_properties(instance): + """A helper method used to overcome a problem where the properties + that rely on joins aren't being loaded automatically + """ + if instance.is_resource: + for prop in instance.join_properties: + getattr(instance, prop) + + +class ListResult(object): + """ + a ListResult contains results about the requested items. + """ + def __init__(self, items, metadata): + self.items = items + self.metadata = metadata + + def __len__(self): + return len(self.items) + def __iter__(self): + return iter(self.items) + def __getitem__(self, item): + return self.items[item] http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/49401998/aria/storage/models.py ---------------------------------------------------------------------- diff --git a/aria/storage/models.py b/aria/storage/models.py index 17b0608..e2cf317 100644 --- a/aria/storage/models.py +++ b/aria/storage/models.py @@ -586,8 +586,8 @@ class Task(structures.SQLModelBase): id = structures.db.Column(structures.db.String, primary_key=True, default=uuid_generator) status = structures.db.Column(structures.db.Enum(*states.TaskState.STATES), - name='task_status', - default=states.TaskState.PENDING) + name='task_status', + default=states.TaskState.PENDING) execution_id = structures.db.Column(structures.db.String) due_at = structures.db.Column(structures.UTCDateTime, default=datetime.utcnow) @@ -601,5 +601,32 @@ class Task(structures.SQLModelBase): # Operation specific fields name = structures.db.Column(structures.db.String) operation_mapping = structures.db.Column(structures.db.String) - actor = structures.db.Column() inputs = structures.db.column(structures.db.PickleType(comparator=lambda *a: False)) + + +class NodeInstanceTask(Task): + id = structures.db.Column(structures.db.Integer, + structures.db.ForeignKey('task.id'), + primary_key=True) + + actor_storage_id = structures.foreign_key(NodeInstance) + actor = structures.one_to_many_relationship( + child_class_name='NodeInstanceTask', + column_name='actor_storage_id', + parent_class_name='NodeInstance', + back_reference_name='node_tasks', + ) + + +class RelationshipInstanceTask(Task): + id = structures.db.Column(structures.db.Integer, + structures.db.ForeignKey('task.id'), + primary_key=True) + + actor_storage_id = structures.foreign_key(RelationshipInstance) + actor = structures.one_to_many_relationship( + child_class_name='RelationshipInstanceTask', + column_name='actor_storage_id', + parent_class_name='RelationshipInstance', + back_reference_name='relationship_tasks', + ) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/49401998/aria/storage/storage_api.py ---------------------------------------------------------------------- diff --git a/aria/storage/storage_api.py b/aria/storage/storage_api.py deleted file mode 100644 index 8fa6e7f..0000000 --- a/aria/storage/storage_api.py +++ /dev/null @@ -1,94 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from contextlib import contextmanager - -from . import exceptions - - -class StorageAPI(object): - def __init__(self, name, *args, **kwargs): - super(StorageAPI, self).__init__(*args, **kwargs) - self._name = name - - @property - def name(self): - return self._name - - def create(self, model_name, **kwargs): - raise NotImplementedError('Subclass must implement abstract create method') - - @contextmanager - def connect(self): - try: - self._establish_connection() - yield self - except BaseException as e: - raise exceptions.StorageError(str(e)) - finally: - self._destroy_connection() - - def _establish_connection(self): - pass - - def _destroy_connection(self): - pass - - -class ModelAPI(StorageAPI): - def __init__(self, model_cls, **kwargs): - super(ModelAPI, self).__init__(**kwargs) - self._model_cls = model_cls - - @property - def model_cls(self): - return self._model_cls - - def get(self, model_name, entry_id, **kwargs): - raise NotImplementedError('Subclass must implement abstract get method') - - def store(self, model_name, entry, **kwargs): - raise NotImplementedError('Subclass must implement abstract store method') - - def delete(self, model_name, entry_id, **kwargs): - raise NotImplementedError('Subclass must implement abstract delete method') - - def iter(self, model_name, **kwargs): - raise NotImplementedError('Subclass must implement abstract iter method') - - def update(self, model_name, entry_id, **kwargs): - raise NotImplementedError('Subclass must implement abstract update method') - - -class ResourceAPI(StorageAPI): - def data(self, entry_id, path=None, **kwargs): - raise NotImplementedError('Subclass must implement abstract data method') - - def download(self, entry_id, destination, path=None, **kwargs): - raise NotImplementedError('Subclass must implement abstract download method') - - def upload(self, entry_id, source, path=None, **kwargs): - raise NotImplementedError('Subclass must implement abstract upload method') - - -def generate_lower_name(model_cls): - """ - Generates the name of the class from the class object. e.g. SomeClass -> some_class - :param model_cls: the class to evaluate. - :return: lower name - :rtype: basestring - """ - return ''.join( - character if character.islower() else '_{0}'.format(character.lower()) - for character in model_cls.__name__)[1:] http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/49401998/aria/storage/structures.py ---------------------------------------------------------------------- diff --git a/aria/storage/structures.py b/aria/storage/structures.py index 922025c..11df33f 100644 --- a/aria/storage/structures.py +++ b/aria/storage/structures.py @@ -83,7 +83,7 @@ def one_to_many_relationship( column_name, parent_class_name, back_reference_name, - parent_id_name='storage_id' + parent_id_name='storage_id', ): """Return a one-to-many SQL relationship object Meant to be used from inside the *child* object @@ -166,24 +166,9 @@ class SQLModelBase(db.Model): # Indicates whether the `id` column in this class should be unique is_id_unique = True - def to_dict(self, suppress_error=False): - """Return a dict representation of the model - - :param suppress_error: If set to True, sets `None` to attributes that - it's unable to retrieve (e.g., if a relationship wasn't established - yet, and so it's impossible to access a property through it) - """ - if suppress_error: - res = dict() - for field in self.fields: - try: - field_value = getattr(self, field) - except AttributeError: - field_value = None - res[field] = field_value - else: - res = {field: getattr(self, field) for field in self.fields} - return res + @property + def to_dict(self): + return {field: getattr(self, field) for field in self.fields} def to_json(self): return jsonpickle.encode(self.to_dict(), unpicklable=False) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/49401998/aria/storage/testing.py ---------------------------------------------------------------------- diff --git a/aria/storage/testing.py b/aria/storage/testing.py new file mode 100644 index 0000000..bbe268e --- /dev/null +++ b/aria/storage/testing.py @@ -0,0 +1,19 @@ +import flask +from aria.storage import db +from aria import storage + +def create_app(user, password, db_name, host='localhost', port=5432): + app = flask.Flask('app') + app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite://' + # app.config['SQLALCHEMY_DATABASE_URI'] = \ + # 'postgresql://{user}:{password}@{host}:{port}/{db_name}'.\ + # format(user=user, password=password, db_name=db_name, host=host, port=port) + db.init_app(app) + db.app=app + with app.app_context(): + db.create_all() + +create_app('user', 'pass', 'tennis') +db.session.add(storage.models.ProviderContext(name='a', context={'a': 1})) +q = db.session.query(storage.models.Blueprint) +pass http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/49401998/tests/mock/context.py ---------------------------------------------------------------------- diff --git a/tests/mock/context.py b/tests/mock/context.py index 0790ba6..b74ad0c 100644 --- a/tests/mock/context.py +++ b/tests/mock/context.py @@ -17,12 +17,11 @@ from aria import application_model_storage from aria.orchestrator import context from . import models -from aria.storage.api_driver.in_memory import InMemoryModelAPI +from aria.storage.api_driver.inmemory import InMemoryModelAPI def simple(**kwargs): - storage = application_model_storage(InMemoryModelAPI()) - storage.setup() + storage = application_model_storage(InMemoryModelAPI) storage.blueprint.store(models.get_blueprint()) storage.deployment.store(models.get_deployment()) final_kwargs = dict( http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/49401998/tests/orchestrator/context/test_workflow.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_workflow.py b/tests/orchestrator/context/test_workflow.py index 51342e9..de0ce62 100644 --- a/tests/orchestrator/context/test_workflow.py +++ b/tests/orchestrator/context/test_workflow.py @@ -19,7 +19,7 @@ import pytest from aria import application_model_storage from aria.orchestrator import context -from aria.storage.api_driver.in_memory import InMemoryModelAPI +from aria.storage.api_driver.inmemory import InMemoryModelAPI from tests.mock import models http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/49401998/tests/orchestrator/workflows/builtin/__init__.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/builtin/__init__.py b/tests/orchestrator/workflows/builtin/__init__.py index e100432..5f2fa2e 100644 --- a/tests/orchestrator/workflows/builtin/__init__.py +++ b/tests/orchestrator/workflows/builtin/__init__.py @@ -76,6 +76,8 @@ def ctx_with_basic_graph(): relationship_instance=relationship_instance ) + simple_context.model.blueprint.store(blueprint) + simple_context.model.deployment.store(deployment) simple_context.model.node.store(dependent_node) simple_context.model.node.store(dependency_node) simple_context.model.node_instance.store(dependent_node_instance) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/49401998/tests/orchestrator/workflows/core/test_engine.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py index 1bcc1e3..e543bcb 100644 --- a/tests/orchestrator/workflows/core/test_engine.py +++ b/tests/orchestrator/workflows/core/test_engine.py @@ -33,7 +33,7 @@ from aria.orchestrator.workflows import ( ) from aria.orchestrator.workflows.core import engine from aria.orchestrator.workflows.executor import thread -from aria.storage.api_driver.in_memory import InMemoryModelAPI +from aria.storage.api_driver.inmemory import InMemoryModelAPI import tests.storage http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/49401998/tests/storage/test_model_storage.py ---------------------------------------------------------------------- diff --git a/tests/storage/test_model_storage.py b/tests/storage/test_model_storage.py index 3288e93..dbfbc58 100644 --- a/tests/storage/test_model_storage.py +++ b/tests/storage/test_model_storage.py @@ -13,6 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +import tempfile +import shutil + import pytest from aria import application_model_storage @@ -26,9 +29,27 @@ from aria.storage import ( ) -def test_storage_base(): - api = storage_api.InMemoryModelAPI - storage = ModelStorage(api) +temp_dir = tempfile.mkdtemp() + +APIs = [ + (storage_api.InMemoryModelAPI, dict()), + (storage_api.FileSystemModelAPI, dict(directory=temp_dir)), + (storage_api.SQLAlchemyModelAPI, dict()) +] + + [email protected](autouse=True) +def cleanup(): + yield + try: + shutil.rmtree(temp_dir, ignore_errors=True) + except BaseException: + pass + + [email protected]('api, api_params', APIs) +def test_storage_base(api, api_params): + storage = ModelStorage(api, api_params=api_params) assert storage.api == api @@ -36,8 +57,9 @@ def test_storage_base(): storage.non_existent_attribute() -def test_model_storage(): - storage = ModelStorage(storage_api.InMemoryModelAPI) [email protected]('api, api_params', APIs) +def test_model_storage(api, api_params): + storage = ModelStorage(api, api_params=api_params) storage.register(models.ProviderContext) pc = models.ProviderContext(context={}, name='context_name', id='id1') @@ -45,8 +67,8 @@ def test_model_storage(): assert storage.provider_context.get('id1').fields_dict == pc.fields_dict - assert [pc_from_storage for pc_from_storage in storage.provider_context.iter()] == [pc] - assert [pc_from_storage for pc_from_storage in storage.provider_context] == [pc] + assert [pc_from_storage.to_dict for pc_from_storage in storage.provider_context.iter()] == [pc.to_dict] + assert [pc_from_storage.to_dict for pc_from_storage in storage.provider_context] == [pc.to_dict] storage.provider_context.update('id1', context={'update_key': 0}) assert storage.provider_context.get('id1').context == {'update_key': 0} @@ -56,8 +78,9 @@ def test_model_storage(): storage.provider_context.get('id1') -def test_storage_driver(): - storage = ModelStorage(storage_api.InMemoryModelAPI) [email protected]('api, api_params', APIs) +def test_storage_driver(api, api_params): + storage = ModelStorage(api, api_params=api_params) storage.register(models.ProviderContext) pc = models.ProviderContext(context={}, name='context_name', id='id2') @@ -74,8 +97,9 @@ def test_storage_driver(): storage.registered['provider_context'].get('id2') -def test_application_storage_factory(): - storage = application_model_storage(storage_api.InMemoryModelAPI) [email protected]('api, api_params', APIs) +def test_application_storage_factory(api, api_params): + storage = application_model_storage(api, api_params=api_params) assert storage.node assert storage.node_instance assert storage.plugin @@ -88,7 +112,7 @@ def test_application_storage_factory(): assert storage.execution assert storage.provider_context - reused_storage = application_model_storage(storage_api.InMemoryModelAPI) + reused_storage = application_model_storage(api, api_params=api_params) assert reused_storage == storage
