added tests and moved expunge to process.py
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/254f879f Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/254f879f Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/254f879f Branch: refs/heads/ARIA-213-Sporadic-tests-failures-over-locked-database-issue Commit: 254f879ffea30af16884428e16c880c6108338c5 Parents: ace30ae Author: max-orlov <[email protected]> Authored: Thu May 11 14:05:46 2017 +0300 Committer: max-orlov <[email protected]> Committed: Thu May 11 14:28:49 2017 +0300 ---------------------------------------------------------------------- aria/orchestrator/workflows/executor/process.py | 1 + aria/storage/instrumentation.py | 37 ++++++++++++------- tests/storage/test_instrumentation.py | 38 ++++++++++++++++++-- 3 files changed, 61 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/254f879f/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index a2f92f9..e8cf019 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -335,6 +335,7 @@ def _patch_ctx(ctx, messenger, instrument): def patched_commit(): messenger.apply_tracked_changes(instrument.tracked_changes, instrument.new_instances) + instrument.expunge_session() instrument.clear() def patched_rollback(): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/254f879f/aria/storage/instrumentation.py ---------------------------------------------------------------------- diff --git a/aria/storage/instrumentation.py b/aria/storage/instrumentation.py index 98770de..6af9473 100644 --- a/aria/storage/instrumentation.py +++ b/aria/storage/instrumentation.py @@ -68,7 +68,9 @@ class _Instrumentation(object): self.tracked_changes = {} self.new_instances = {} self.listeners = [] - self._track_changes(ctx, instrumented) + self._new_modeled_instances = [] + self._ctx = ctx + self._track_changes(instrumented) self._new_instance_index = 0 @property @@ -76,7 +78,17 @@ class _Instrumentation(object): self._new_instance_index += 1 return '{prefix}_{index}'.format(prefix=_NEW_INSTANCE, index=self._new_instance_index - 1) - def _track_changes(self, ctx, instrumented): + def expunge_session(self): + for new_instance in self._new_modeled_instances: + self._get_session_from_ctx(new_instance.__tablename__).expunge(new_instance) + + def _get_session_from_ctx(self, tablename): + mapi = getattr(self._ctx.model, tablename, None) + if mapi: + return mapi._session + raise StorageError("Could not retrieve session for {0}".format(tablename)) + + def _track_changes(self, instrumented): instrumented_attribute_classes = {} for instrumented_attribute, attribute_type in instrumented.get('modified', {}).items(): self._register_set_attribute_listener( @@ -93,29 +105,27 @@ class _Instrumentation(object): # instrument creation of new instances for instrumented_class in instrumented.get('new', {}): - self._register_instance_listener(ctx, instrumented_class) + self._register_instance_listener(instrumented_class) - def _register_instance_listener(self, ctx, instrumented_class): - if ctx is None: + def _register_instance_listener(self, instrumented_class): + if self._ctx is None: if instrumented_class: raise StorageError("In order to keep track of new instances, a ctx is needed") else: return - def listener(session, instance): + def listener(_, instance): if not isinstance(instance, instrumented_class): return - session.expunge(instance) + self._new_modeled_instances.append(instance) tracked_instances = self.new_instances.setdefault(instance.__modelname__, {}) tracked_attributes = tracked_instances.setdefault(self._new_instance_id, {}) instance_as_dict = instance.to_dict() - instance_as_dict.update((k, getattr(instance, k)) for k in instance.__private_fields__) + instance_as_dict.update((k, getattr(instance, k)) + for k in getattr(instance, '__private_fields__', [])) tracked_attributes.update(instance_as_dict) - mapi = getattr(ctx.model, instrumented_class.__tablename__, None) - if not mapi: - raise StorageError( - "Could not retrieve session for {0}".format(instrumented_class.__tablename__)) - listener_args = (mapi._session, 'after_attach', listener) + session = self._get_session_from_ctx(instrumented_class.__tablename__) + listener_args = (session, 'after_attach', listener) sqlalchemy.event.listen(*listener_args) self.listeners.append(listener_args) @@ -168,6 +178,7 @@ class _Instrumentation(object): self.tracked_changes.clear() self.new_instances.clear() + self._new_modeled_instances = [] def restore(self): """Remove all listeners registered by this instrumentation""" http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/254f879f/tests/storage/test_instrumentation.py ---------------------------------------------------------------------- diff --git a/tests/storage/test_instrumentation.py b/tests/storage/test_instrumentation.py index 227ee7b..24c4204 100644 --- a/tests/storage/test_instrumentation.py +++ b/tests/storage/test_instrumentation.py @@ -27,6 +27,7 @@ from aria.storage import ( sql_mapi, instrumentation ) +from tests.orchestrator.workflows.executor import MockContext from . import release_sqlite_storage, init_inmemory_model_storage @@ -275,8 +276,41 @@ class TestInstrumentation(object): instrument.clear() assert instrument.tracked_changes == {} - def _track_changes(self, instrumented): - instrument = instrumentation.track_changes(instrumented={'modified': instrumented}) + def test_new_instances(self, storage): + model_kwargs = dict( + name='name', + dict1={'initial': 'value'}, + dict2={'initial': 'value'}, + list1=['initial'], + list2=['initial'], + int1=0, + int2=0, + string2='string') + model_instance_1 = MockModel1(**model_kwargs) + model_instance_2 = MockModel2(**model_kwargs) + + ctx = MockContext(storage) + instrument = self._track_changes(ctx=ctx, instrumented_new=(MockModel1, )) + assert not instrument.tracked_changes + + storage.mock_model_1.put(model_instance_1) + storage.mock_model_2.put(model_instance_2) + # Assert all models made it to storage + assert len(storage.mock_model_1.list()) == len(storage.mock_model_2.list()) == 1 + + # Assert only one model was tracked + assert len(instrument.new_instances) == 1 + + mock_model_1 = instrument.new_instances[MockModel1.__tablename__].values()[0] + storage_model1_instance = storage.mock_model_1.get(model_instance_1.id) + + for key in model_kwargs: + assert mock_model_1[key] == model_kwargs[key] == getattr(storage_model1_instance, key) + + def _track_changes(self, instrumented_modified=None, ctx=None, instrumented_new=None): + instrument = instrumentation.track_changes( + ctx=ctx, + instrumented={'modified': instrumented_modified or {}, 'new': instrumented_new or {}}) instruments_holder.append(instrument) return instrument
