Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-213-Sporadic-tests-failures-over-locked-database-issue 254f879ff -> a37af2a8f (forced update)
added tests and moved expunge to process.py Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/a37af2a8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/a37af2a8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/a37af2a8 Branch: refs/heads/ARIA-213-Sporadic-tests-failures-over-locked-database-issue Commit: a37af2a8f8e742892c86a438770693c77463ca13 Parents: ace30ae Author: max-orlov <[email protected]> Authored: Thu May 11 14:05:46 2017 +0300 Committer: max-orlov <[email protected]> Committed: Thu May 11 14:58:23 2017 +0300 ---------------------------------------------------------------------- aria/orchestrator/workflows/executor/process.py | 1 + aria/storage/instrumentation.py | 37 ++++++++++++------- aria/storage/sql_mapi.py | 2 +- examples/hello-world/scripts/start.sh | 8 ++--- tests/orchestrator/context/test_operation.py | 2 +- tests/storage/test_instrumentation.py | 38 ++++++++++++++++++-- 6 files changed, 67 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a37af2a8/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index a2f92f9..e8cf019 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -335,6 +335,7 @@ def _patch_ctx(ctx, messenger, instrument): def patched_commit(): messenger.apply_tracked_changes(instrument.tracked_changes, instrument.new_instances) + instrument.expunge_session() instrument.clear() def patched_rollback(): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a37af2a8/aria/storage/instrumentation.py ---------------------------------------------------------------------- diff --git a/aria/storage/instrumentation.py b/aria/storage/instrumentation.py index 98770de..6af9473 100644 --- a/aria/storage/instrumentation.py +++ b/aria/storage/instrumentation.py @@ -68,7 +68,9 @@ class _Instrumentation(object): self.tracked_changes = {} self.new_instances = {} self.listeners = [] - self._track_changes(ctx, instrumented) + self._new_modeled_instances = [] + self._ctx = ctx + self._track_changes(instrumented) self._new_instance_index = 0 @property @@ -76,7 +78,17 @@ class _Instrumentation(object): self._new_instance_index += 1 return '{prefix}_{index}'.format(prefix=_NEW_INSTANCE, index=self._new_instance_index - 1) - def _track_changes(self, ctx, instrumented): + def expunge_session(self): + for new_instance in self._new_modeled_instances: + self._get_session_from_ctx(new_instance.__tablename__).expunge(new_instance) + + def _get_session_from_ctx(self, tablename): + mapi = getattr(self._ctx.model, tablename, None) + if mapi: + return mapi._session + raise StorageError("Could not retrieve session for {0}".format(tablename)) + + def _track_changes(self, instrumented): instrumented_attribute_classes = {} for instrumented_attribute, attribute_type in instrumented.get('modified', {}).items(): self._register_set_attribute_listener( @@ -93,29 +105,27 @@ class _Instrumentation(object): # instrument creation of new instances for instrumented_class in instrumented.get('new', {}): - self._register_instance_listener(ctx, instrumented_class) + self._register_instance_listener(instrumented_class) - def _register_instance_listener(self, ctx, instrumented_class): - if ctx is None: + def _register_instance_listener(self, instrumented_class): + if self._ctx is None: if instrumented_class: raise StorageError("In order to keep track of new instances, a ctx is needed") else: return - def listener(session, instance): + def listener(_, instance): if not isinstance(instance, instrumented_class): return - session.expunge(instance) + self._new_modeled_instances.append(instance) tracked_instances = self.new_instances.setdefault(instance.__modelname__, {}) tracked_attributes = tracked_instances.setdefault(self._new_instance_id, {}) instance_as_dict = instance.to_dict() - instance_as_dict.update((k, getattr(instance, k)) for k in instance.__private_fields__) + instance_as_dict.update((k, getattr(instance, k)) + for k in getattr(instance, '__private_fields__', [])) tracked_attributes.update(instance_as_dict) - mapi = getattr(ctx.model, instrumented_class.__tablename__, None) - if not mapi: - raise StorageError( - "Could not retrieve session for {0}".format(instrumented_class.__tablename__)) - listener_args = (mapi._session, 'after_attach', listener) + session = self._get_session_from_ctx(instrumented_class.__tablename__) + listener_args = (session, 'after_attach', listener) sqlalchemy.event.listen(*listener_args) self.listeners.append(listener_args) @@ -168,6 +178,7 @@ class _Instrumentation(object): self.tracked_changes.clear() self.new_instances.clear() + self._new_modeled_instances = [] def restore(self): """Remove all listeners registered by this instrumentation""" http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a37af2a8/aria/storage/sql_mapi.py ---------------------------------------------------------------------- diff --git a/aria/storage/sql_mapi.py b/aria/storage/sql_mapi.py index a22c4b1..730d007 100644 --- a/aria/storage/sql_mapi.py +++ b/aria/storage/sql_mapi.py @@ -397,7 +397,7 @@ def init_storage(base_dir, filename='db.sqlite'): path=os.path.join(base_dir, filename)) - engine = create_engine(uri, ) + engine = create_engine(uri) session_factory = orm.sessionmaker(bind=engine) session = orm.scoped_session(session_factory=session_factory) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a37af2a8/examples/hello-world/scripts/start.sh ---------------------------------------------------------------------- diff --git a/examples/hello-world/scripts/start.sh b/examples/hello-world/scripts/start.sh index 41b6ec7..96298c5 100755 --- a/examples/hello-world/scripts/start.sh +++ b/examples/hello-world/scripts/start.sh @@ -28,7 +28,7 @@ server_is_up() { return 0 fi else -# ctx logger error "Both curl, wget were not found in path" + ctx logger error "Both curl, wget were not found in path" exit 1 fi return 1 @@ -38,15 +38,15 @@ STARTED=false for i in $(seq 1 15) do if server_is_up; then -# ctx logger info "Server is up." + ctx logger info "Server is up." STARTED=true break else -# ctx logger info "Server not up. waiting 1 second." + ctx logger info "Server not up. waiting 1 second." sleep 1 fi done if [ ${STARTED} = false ]; then -# ctx logger error "Failed starting web server in 15 seconds." + ctx logger error "Failed starting web server in 15 seconds." exit 1 fi http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a37af2a8/tests/orchestrator/context/test_operation.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_operation.py b/tests/orchestrator/context/test_operation.py index 757a375..cdeb5fa 100644 --- a/tests/orchestrator/context/test_operation.py +++ b/tests/orchestrator/context/test_operation.py @@ -263,7 +263,7 @@ def test_plugin_workdir(ctx, thread_executor, tmpdir): @pytest.fixture(params=[ - # (thread.ThreadExecutor, {}), + (thread.ThreadExecutor, {}), (process.ProcessExecutor, {'python_path': [tests.ROOT_DIR]}), ]) def executor(request): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a37af2a8/tests/storage/test_instrumentation.py ---------------------------------------------------------------------- diff --git a/tests/storage/test_instrumentation.py b/tests/storage/test_instrumentation.py index 227ee7b..24c4204 100644 --- a/tests/storage/test_instrumentation.py +++ b/tests/storage/test_instrumentation.py @@ -27,6 +27,7 @@ from aria.storage import ( sql_mapi, instrumentation ) +from tests.orchestrator.workflows.executor import MockContext from . import release_sqlite_storage, init_inmemory_model_storage @@ -275,8 +276,41 @@ class TestInstrumentation(object): instrument.clear() assert instrument.tracked_changes == {} - def _track_changes(self, instrumented): - instrument = instrumentation.track_changes(instrumented={'modified': instrumented}) + def test_new_instances(self, storage): + model_kwargs = dict( + name='name', + dict1={'initial': 'value'}, + dict2={'initial': 'value'}, + list1=['initial'], + list2=['initial'], + int1=0, + int2=0, + string2='string') + model_instance_1 = MockModel1(**model_kwargs) + model_instance_2 = MockModel2(**model_kwargs) + + ctx = MockContext(storage) + instrument = self._track_changes(ctx=ctx, instrumented_new=(MockModel1, )) + assert not instrument.tracked_changes + + storage.mock_model_1.put(model_instance_1) + storage.mock_model_2.put(model_instance_2) + # Assert all models made it to storage + assert len(storage.mock_model_1.list()) == len(storage.mock_model_2.list()) == 1 + + # Assert only one model was tracked + assert len(instrument.new_instances) == 1 + + mock_model_1 = instrument.new_instances[MockModel1.__tablename__].values()[0] + storage_model1_instance = storage.mock_model_1.get(model_instance_1.id) + + for key in model_kwargs: + assert mock_model_1[key] == model_kwargs[key] == getattr(storage_model1_instance, key) + + def _track_changes(self, instrumented_modified=None, ctx=None, instrumented_new=None): + instrument = instrumentation.track_changes( + ctx=ctx, + instrumented={'modified': instrumented_modified or {}, 'new': instrumented_new or {}}) instruments_holder.append(instrument) return instrument
