Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-42-Generic-ctx-serialization-mechanism 0e94bfc88 -> 96cb3bbd1
tests are passing, serialized init_func Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/96cb3bbd Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/96cb3bbd Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/96cb3bbd Branch: refs/heads/ARIA-42-Generic-ctx-serialization-mechanism Commit: 96cb3bbd189b42f1222d73777de7bb21b7b6e197 Parents: 0e94bfc Author: mxmrlv <[email protected]> Authored: Mon Feb 6 16:05:56 2017 +0200 Committer: mxmrlv <[email protected]> Committed: Mon Feb 6 16:05:56 2017 +0200 ---------------------------------------------------------------------- aria/__init__.py | 7 +++++-- aria/orchestrator/context/serialization.py | 14 +++++++++++--- aria/orchestrator/workflows/executor/process.py | 12 ++++++++++-- aria/storage/api.py | 19 +++++++++++++++++-- aria/storage/sql_mapi.py | 10 ++++++++-- tests/mock/context.py | 5 +++-- tests/orchestrator/context/test_serialize.py | 7 ++++--- .../orchestrator/execution_plugin/test_local.py | 4 +--- tests/orchestrator/test_runner.py | 3 ++- tests/storage/__init__.py | 2 ++ tests/storage/test_instrumentation.py | 1 + tests/storage/test_model_storage.py | 1 + tests/storage/test_structures.py | 1 + 13 files changed, 66 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/96cb3bbd/aria/__init__.py ---------------------------------------------------------------------- diff --git a/aria/__init__.py b/aria/__init__.py index 48e4f00..f3bc043 100644 --- a/aria/__init__.py +++ b/aria/__init__.py @@ -57,7 +57,7 @@ def install_aria_extensions(): extension.init() -def application_model_storage(api, api_kwargs=None): +def application_model_storage(api, api_kwargs=None, storage_initiator_func=None): """ Initiate model storage """ @@ -79,12 +79,15 @@ def application_model_storage(api, api_kwargs=None): storage.model.Task, ] # if api not in _model_storage: + api.storage_initiator(storage_initiator_func or storage.sql_mapi.init_storage) return storage.ModelStorage(api, items=models, api_kwargs=api_kwargs or {}) -def application_resource_storage(api, api_kwargs=None): +def application_resource_storage(api, api_kwargs=None, storage_initiator_func=None): """ Initiate resource storage """ + + # api.storage_initiator(storage_initiator_func or (lambda **kwargs: kwargs)) return storage.ResourceStorage( api, api_kwargs=api_kwargs or {}, items=['blueprint', 'deployment', 'plugin', ]) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/96cb3bbd/aria/orchestrator/context/serialization.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/serialization.py b/aria/orchestrator/context/serialization.py index b2f1e97..f9ad090 100644 --- a/aria/orchestrator/context/serialization.py +++ b/aria/orchestrator/context/serialization.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import dill + import aria @@ -30,14 +32,18 @@ def operation_context_to_dict(context): context_dict['model_storage'] = { 'api_cls': model.api, 'api_kwargs': model._api_kwargs, + 'init_func': model.api._init_func.__func__ if hasattr(model.api, '_init_func') else None } + else: context_dict['model_storage'] = None if context.resource: resource = context.resource context_dict['resource_storage'] = { 'api_cls': resource.api, - 'api_kwargs': resource._api_kwargs + 'api_kwargs': resource._api_kwargs, + 'init_func': resource.api._init_func.__func__ + if hasattr(resource.api, '_init_func') else None } else: context_dict['resource_storage'] = None @@ -55,14 +61,16 @@ def operation_context_from_dict(context_dict): if model_storage: api_cls = model_storage['api_cls'] api_kwargs = model_storage['api_kwargs'] + init_func = model_storage['init_func'] context['model_storage'] = aria.application_model_storage( - api_cls, api_kwargs=api_kwargs) + api_cls, api_kwargs=api_kwargs, storage_initiator_func=init_func) resource_storage = context['resource_storage'] if resource_storage: api_cls = resource_storage['api_cls'] api_kwargs = resource_storage['api_kwargs'] + init_func = resource_storage['init_func'] context['resource_storage'] = aria.application_resource_storage( - api=api_cls, api_kwargs=api_kwargs) + api=api_cls, api_kwargs=api_kwargs, storage_initiator_func=init_func) return context_cls(**context) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/96cb3bbd/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index 7d990fa..cf50ac6 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -37,6 +37,7 @@ import struct import subprocess import tempfile import Queue +import dill import jsonpickle @@ -113,7 +114,7 @@ class ProcessExecutor(base.BaseExecutor): file_descriptor, arguments_json_path = tempfile.mkstemp(prefix='executor-', suffix='.json') os.close(file_descriptor) with open(arguments_json_path, 'wb') as f: - f.write(jsonpickle.dumps(self._create_arguments_dict(task))) + f.write(dill.dumps(self._create_arguments_dict(task))) env = os.environ.copy() # See _update_env for plugin_prefix usage @@ -306,7 +307,7 @@ def _patch_session(ctx, messenger, instrument): def _main(): arguments_json_path = sys.argv[1] with open(arguments_json_path) as f: - arguments = jsonpickle.loads(f.read()) + arguments = dill.loads(f.read()) # arguments_json_path is a temporary file created by the parent process. # so we remove it here @@ -326,6 +327,7 @@ def _main(): storage_type.remove_mutable_association_listener() with instrumentation.track_changes() as instrument: + ctx = None try: ctx = serialization.operation_context_from_dict(context_dict) _patch_session(ctx=ctx, messenger=messenger, instrument=instrument) @@ -337,6 +339,12 @@ def _main(): messenger.succeeded(tracked_changes=instrument.tracked_changes) except BaseException as e: messenger.failed(exception=e, tracked_changes=instrument.tracked_changes) + finally: + if not ctx: + return + for session in set(mapi._session for mapi in ctx.model.registered.values() or []): + session.close() + if __name__ == '__main__': _main() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/96cb3bbd/aria/storage/api.py ---------------------------------------------------------------------- diff --git a/aria/storage/api.py b/aria/storage/api.py index adf3198..e15795b 100644 --- a/aria/storage/api.py +++ b/aria/storage/api.py @@ -34,8 +34,23 @@ class StorageAPI(object): def storage_initiator(cls, func=None): if func is None: return partial(cls.storage_initiator, cls=cls) - init = cls.__init__ - cls.__init__ = lambda self, *args, **kwargs: init(self, **func(cls=cls, *args, **kwargs)) + cls._original_init = cls.__init__ + cls._init_func = func + cls.__init__ = \ + lambda self, *args, **kwargs: cls._original_init(self, **func(cls=cls, *args, **kwargs)) + + @classmethod + def free_storage_initiator(cls, safe=False): + if not hasattr(cls, '_original_init'): + raise Exception('No storage initiator was registered') + elif cls.__init__ != cls._original_init: + cls.__init__ = cls._original_init + + if hasattr(cls, '_engine'): + delattr(cls, '_engine') + if hasattr(cls, '_session'): + delattr(cls, '_session') + class ModelAPI(StorageAPI): """ http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/96cb3bbd/aria/storage/sql_mapi.py ---------------------------------------------------------------------- diff --git a/aria/storage/sql_mapi.py b/aria/storage/sql_mapi.py index ed993fa..1138fff 100644 --- a/aria/storage/sql_mapi.py +++ b/aria/storage/sql_mapi.py @@ -370,9 +370,15 @@ class SQLAlchemyModelAPI(api.ModelAPI): getattr(instance, rel.key) [email protected]_initiator def init_storage(cls, base_dir=None, filename='db.sqlite', **kwargs): - if not hasattr(cls, '_engine'): + + _storage_kwargs = { + 'base_dir': base_dir, + 'filename': filename + } + _storage_kwargs.update(**kwargs) + + if not hasattr(cls, '_engine') or getattr(cls, '_storage_kwargs', {}) == _storage_kwargs: if base_dir is not None: uri = 'sqlite:///{platform_char}{path}'.format( # Handles the windows behavior where there is not root, but drivers. http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/96cb3bbd/tests/mock/context.py ---------------------------------------------------------------------- diff --git a/tests/mock/context.py b/tests/mock/context.py index 0f8ae41..c5da063 100644 --- a/tests/mock/context.py +++ b/tests/mock/context.py @@ -16,15 +16,16 @@ import aria from aria.orchestrator import context from aria.storage.filesystem_rapi import FileSystemResourceAPI -from aria.storage.sql_mapi import SQLAlchemyModelAPI +from aria.storage import sql_mapi from . import models from .topology import create_simple_topology_two_nodes def simple(tmpdir, model_driver_kwargs=None, resources_driver_kwargs=None, context_kwargs=None): + model_storage = aria.application_model_storage( - SQLAlchemyModelAPI, api_kwargs=model_driver_kwargs or {}) + sql_mapi.SQLAlchemyModelAPI, api_kwargs=model_driver_kwargs or {}) resource_storage = aria.application_resource_storage( FileSystemResourceAPI, api_kwargs=resources_driver_kwargs or dict(directory=str(tmpdir))) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/96cb3bbd/tests/orchestrator/context/test_serialize.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_serialize.py b/tests/orchestrator/context/test_serialize.py index e881266..a6dbc45 100644 --- a/tests/orchestrator/context/test_serialize.py +++ b/tests/orchestrator/context/test_serialize.py @@ -16,8 +16,7 @@ import pytest import aria -from aria.storage import model -from aria.storage.sql_mapi import SQLAlchemyModelAPI +from aria.storage import sql_mapi from aria.orchestrator.workflows import api from aria.orchestrator.workflows.core import engine from aria.orchestrator.workflows.executor import process @@ -100,6 +99,8 @@ def context(tmpdir): @pytest.fixture def memory_model_storage(): - result = aria.application_model_storage(SQLAlchemyModelAPI) + result = aria.application_model_storage(sql_mapi.SQLAlchemyModelAPI) yield result storage.release_sqlite_storage(result) + + http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/96cb3bbd/tests/orchestrator/execution_plugin/test_local.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py index f308071..dd41224 100644 --- a/tests/orchestrator/execution_plugin/test_local.py +++ b/tests/orchestrator/execution_plugin/test_local.py @@ -36,7 +36,6 @@ from tests.orchestrator.workflows.helpers import events_collector IS_WINDOWS = os.name == 'nt' [email protected]("debugging...") class TestLocalRunScript(object): def test_script_path_parameter(self, executor, workflow_context, tmpdir): @@ -505,8 +504,7 @@ if __name__ == '__main__': @pytest.fixture def workflow_context(self, tmpdir): workflow_context = mock.context.simple( - str(tmpdir.join('resources')), - model_driver_kwargs=dict(base_dir=str(tmpdir))) + str(tmpdir.join('resources')), model_driver_kwargs=dict(base_dir=str(tmpdir))) workflow_context.states = [] workflow_context.exception = None yield workflow_context http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/96cb3bbd/tests/orchestrator/test_runner.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/test_runner.py b/tests/orchestrator/test_runner.py index 1d46e91..0a6fadf 100644 --- a/tests/orchestrator/test_runner.py +++ b/tests/orchestrator/test_runner.py @@ -36,6 +36,7 @@ def cleanup(): OPERATION_RESULTS.clear() [email protected]() def test_runner_no_tasks(): @workflow def workflow_fn(ctx, graph): # pylint: disable=unused-argument @@ -43,7 +44,7 @@ def test_runner_no_tasks(): _test_runner(workflow_fn) - [email protected]() def test_runner_tasks(): @workflow def workflow_fn(ctx, graph): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/96cb3bbd/tests/storage/__init__.py ---------------------------------------------------------------------- diff --git a/tests/storage/__init__.py b/tests/storage/__init__.py index 3c1ac68..589a01f 100644 --- a/tests/storage/__init__.py +++ b/tests/storage/__init__.py @@ -61,6 +61,8 @@ def release_sqlite_storage(storage): mapis = storage.registered.values() if mapis: + for mapi in mapis: + mapi.free_storage_initiator(safe=True) for session in set(mapi._session for mapi in mapis): session.rollback() session.close() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/96cb3bbd/tests/storage/test_instrumentation.py ---------------------------------------------------------------------- diff --git a/tests/storage/test_instrumentation.py b/tests/storage/test_instrumentation.py index db888b3..7eb733f 100644 --- a/tests/storage/test_instrumentation.py +++ b/tests/storage/test_instrumentation.py @@ -328,6 +328,7 @@ def restore_instrumentation(): @pytest.fixture def storage(): + sql_mapi.SQLAlchemyModelAPI.storage_initiator(sql_mapi.init_storage) result = ModelStorage( api_cls=sql_mapi.SQLAlchemyModelAPI, items=(MockModel1, MockModel2, StrictMockModel)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/96cb3bbd/tests/storage/test_model_storage.py ---------------------------------------------------------------------- diff --git a/tests/storage/test_model_storage.py b/tests/storage/test_model_storage.py index e67de4a..02bcee7 100644 --- a/tests/storage/test_model_storage.py +++ b/tests/storage/test_model_storage.py @@ -30,6 +30,7 @@ from . import MockModel @pytest.fixture def storage(): base_storage = ModelStorage(sql_mapi.SQLAlchemyModelAPI) + sql_mapi.SQLAlchemyModelAPI.storage_initiator(sql_mapi.init_storage) base_storage.register(MockModel) yield base_storage release_sqlite_storage(base_storage) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/96cb3bbd/tests/storage/test_structures.py ---------------------------------------------------------------------- diff --git a/tests/storage/test_structures.py b/tests/storage/test_structures.py index 4473916..853a667 100644 --- a/tests/storage/test_structures.py +++ b/tests/storage/test_structures.py @@ -37,6 +37,7 @@ from ..mock import ( @pytest.fixture def storage(): base_storage = ModelStorage(sql_mapi.SQLAlchemyModelAPI) + sql_mapi.SQLAlchemyModelAPI.storage_initiator(sql_mapi.init_storage) base_storage.register(MockModel) yield base_storage release_sqlite_storage(base_storage)
