Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-213-Sporadic-tests-failures-over-locked-database-issue a37af2a8f -> 9d0f37ec5
code review 1 Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/9d0f37ec Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/9d0f37ec Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/9d0f37ec Branch: refs/heads/ARIA-213-Sporadic-tests-failures-over-locked-database-issue Commit: 9d0f37ec569585206eee8a99756b02688e6f6527 Parents: a37af2a Author: max-orlov <[email protected]> Authored: Thu May 11 16:45:35 2017 +0300 Committer: max-orlov <[email protected]> Committed: Thu May 11 16:45:35 2017 +0300 ---------------------------------------------------------------------- aria/logger.py | 7 +-- aria/orchestrator/workflows/executor/process.py | 7 ++- aria/storage/instrumentation.py | 59 +++++++++++--------- .../workflows/executor/test_executor.py | 2 +- tests/storage/test_instrumentation.py | 8 +-- 5 files changed, 41 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9d0f37ec/aria/logger.py ---------------------------------------------------------------------- diff --git a/aria/logger.py b/aria/logger.py index 9214bd9..bd7ed4e 100644 --- a/aria/logger.py +++ b/aria/logger.py @@ -118,12 +118,7 @@ def create_sqla_log_handler(model, log_cls, execution_id, level=logging.DEBUG): # This is needed since the engine and session are entirely new we need to reflect the db # schema of the logging model into the engine and session. - log_cls.__table__.create(bind=model.log._engine, checkfirst=True) - - return _SQLAlchemyHandler(model=model, - log_cls=log_cls, - execution_id=execution_id, - level=level) + return _SQLAlchemyHandler(model=model, log_cls=log_cls, execution_id=execution_id, level=level) class _DefaultConsoleFormat(logging.Formatter): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9d0f37ec/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index e8cf019..824c4e1 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -340,7 +340,7 @@ def _patch_ctx(ctx, messenger, instrument): def patched_rollback(): # Rollback is performed on parent process when commit fails - pass + instrument.expunge_session() # when autoflush is set to true (the default), refreshing an object will trigger # an auto flush by sqlalchemy, this autoflush will attempt to commit changes made so @@ -378,7 +378,7 @@ def _main(): messenger.failed(exception=e, tracked_changes=None, new_instances=None) return - with instrumentation.track_changes(ctx) as instrument: + with instrumentation.track_changes(ctx.model) as instrument: try: messenger.started() _patch_ctx(ctx=ctx, messenger=messenger, instrument=instrument) @@ -393,7 +393,8 @@ def _main(): messenger.failed(exception=e, tracked_changes=instrument.tracked_changes, new_instances=instrument.new_instances) - + finally: + instrument.expunge_session() if __name__ == '__main__': _main() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9d0f37ec/aria/storage/instrumentation.py ---------------------------------------------------------------------- diff --git a/aria/storage/instrumentation.py b/aria/storage/instrumentation.py index 6af9473..23f6fd0 100644 --- a/aria/storage/instrumentation.py +++ b/aria/storage/instrumentation.py @@ -14,6 +14,8 @@ # limitations under the License. import copy +import json +import os import sqlalchemy.event @@ -36,7 +38,7 @@ _INSTRUMENTED = { _NEW_INSTANCE = 'NEW_INSTANCE' -def track_changes(ctx=None, instrumented=None): +def track_changes(model=None, instrumented=None): """Track changes in the specified model columns This call will register event listeners using sqlalchemy's event mechanism. The listeners @@ -55,41 +57,43 @@ def track_changes(ctx=None, instrumented=None): the instrumentation context returned from this call, to the parent process. The parent process will then call ``apply_tracked_changes()`` that resides in this module as well. At that point, the changes will actually be written back to the database. - + + :param model: the model storage. it should hold a mapi for each model. the session of each mapi + is needed to setup events :param instrumented: A dict from model columns to their python native type :return: The instrumentation context """ - return _Instrumentation(ctx, instrumented or _INSTRUMENTED) + return _Instrumentation(model, instrumented or _INSTRUMENTED) class _Instrumentation(object): - def __init__(self, ctx, instrumented): + def __init__(self, model, instrumented): self.tracked_changes = {} self.new_instances = {} self.listeners = [] self._new_modeled_instances = [] - self._ctx = ctx + self._model = model self._track_changes(instrumented) - self._new_instance_index = 0 @property def _new_instance_id(self): - self._new_instance_index += 1 - return '{prefix}_{index}'.format(prefix=_NEW_INSTANCE, index=self._new_instance_index - 1) + return '{prefix}_{index}'.format(prefix=_NEW_INSTANCE, + index=len(self._new_modeled_instances)) def expunge_session(self): for new_instance in self._new_modeled_instances: - self._get_session_from_ctx(new_instance.__tablename__).expunge(new_instance) + self._get_session_from_model(new_instance.__tablename__).expunge(new_instance) - def _get_session_from_ctx(self, tablename): - mapi = getattr(self._ctx.model, tablename, None) + def _get_session_from_model(self, tablename): + mapi = getattr(self._model, tablename, None) if mapi: return mapi._session raise StorageError("Could not retrieve session for {0}".format(tablename)) def _track_changes(self, instrumented): instrumented_attribute_classes = {} + # Track any newly-set attributes. for instrumented_attribute, attribute_type in instrumented.get('modified', {}).items(): self._register_set_attribute_listener( instrumented_attribute=instrumented_attribute, @@ -98,25 +102,24 @@ class _Instrumentation(object): instrumented_class_attributes = instrumented_attribute_classes.setdefault( instrumented_class, {}) instrumented_class_attributes[instrumented_attribute.key] = attribute_type + + # Track any global instance update such as 'refresh' or 'load' for instrumented_class, instrumented_attributes in instrumented_attribute_classes.items(): - self._register_instance_attribute_listeners( - instrumented_class=instrumented_class, - instrumented_attributes=instrumented_attributes) + self._register_instance_listeners(instrumented_class=instrumented_class, + instrumented_attributes=instrumented_attributes) - # instrument creation of new instances + # Track any newly created instances. for instrumented_class in instrumented.get('new', {}): - self._register_instance_listener(instrumented_class) + self._register_new_instance_listener(instrumented_class) - def _register_instance_listener(self, instrumented_class): - if self._ctx is None: + def _register_new_instance_listener(self, instrumented_class): + if self._model is None: if instrumented_class: raise StorageError("In order to keep track of new instances, a ctx is needed") else: return def listener(_, instance): - if not isinstance(instance, instrumented_class): - return self._new_modeled_instances.append(instance) tracked_instances = self.new_instances.setdefault(instance.__modelname__, {}) tracked_attributes = tracked_instances.setdefault(self._new_instance_id, {}) @@ -124,7 +127,7 @@ class _Instrumentation(object): instance_as_dict.update((k, getattr(instance, k)) for k in getattr(instance, '__private_fields__', [])) tracked_attributes.update(instance_as_dict) - session = self._get_session_from_ctx(instrumented_class.__tablename__) + session = self._get_session_from_model(instrumented_class.__tablename__) listener_args = (session, 'after_attach', listener) sqlalchemy.event.listen(*listener_args) self.listeners.append(listener_args) @@ -144,7 +147,7 @@ class _Instrumentation(object): sqlalchemy.event.listen(*listener_args, retval=True) self.listeners.append(listener_args) - def _register_instance_attribute_listeners(self, instrumented_class, instrumented_attributes): + def _register_instance_listeners(self, instrumented_class, instrumented_attributes): def listener(target, *_): mapi_name = instrumented_class.__modelname__ tracked_instances = self.tracked_changes.setdefault(mapi_name, {}) @@ -224,6 +227,7 @@ def apply_tracked_changes(tracked_changes, new_instances, model): """ successfully_updated_changes = dict() try: + # handle instance updates for mapi_name, tracked_instances in tracked_changes.items(): successfully_updated_changes[mapi_name] = dict() mapi = getattr(model, mapi_name) @@ -240,10 +244,11 @@ def apply_tracked_changes(tracked_changes, new_instances, model): successfully_updated_changes[mapi_name][instance_id] = [ v.dict for v in tracked_attributes.values()] + # Handle new instances for mapi_name, new_instance in new_instances.items(): successfully_updated_changes[mapi_name] = dict() mapi = getattr(model, mapi_name) - for new_instance_kwargs in sorted(new_instance.values()): + for new_instance_kwargs in new_instance.values(): instance = mapi.model_cls(**new_instance_kwargs) mapi.put(instance) successfully_updated_changes[mapi_name][instance.id] = new_instance_kwargs @@ -252,10 +257,10 @@ def apply_tracked_changes(tracked_changes, new_instances, model): if not value: del successfully_updated_changes[key] # TODO: if the successful has _STUB, the logging fails because it can't serialize the object - # model.logger.error( - # 'Registering all the changes to the storage has failed. {0}' - # 'The successful updates were: {0} ' - # '{1}'.format(os.linesep, json.dumps(successfully_updated_changes, indent=4))) + model.logger.error( + 'Registering all the changes to the storage has failed. {0}' + 'The successful updates were: {0} ' + '{1}'.format(os.linesep, json.dumps(successfully_updated_changes, indent=4))) raise http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9d0f37ec/tests/orchestrator/workflows/executor/test_executor.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_executor.py b/tests/orchestrator/workflows/executor/test_executor.py index 47604e9..29cb0e8 100644 --- a/tests/orchestrator/workflows/executor/test_executor.py +++ b/tests/orchestrator/workflows/executor/test_executor.py @@ -55,7 +55,7 @@ def execute_and_assert(executor, storage=None): @retrying.retry(stop_max_delay=10000, wait_fixed=100) def assertion(): - # assert successful_task.states == ['start', 'success'] + assert successful_task.states == ['start', 'success'] assert failing_task.states == ['start', 'failure'] assert task_with_inputs.states == ['start', 'failure'] assert isinstance(failing_task.exception, MockException) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9d0f37ec/tests/storage/test_instrumentation.py ---------------------------------------------------------------------- diff --git a/tests/storage/test_instrumentation.py b/tests/storage/test_instrumentation.py index 24c4204..cfea963 100644 --- a/tests/storage/test_instrumentation.py +++ b/tests/storage/test_instrumentation.py @@ -36,7 +36,6 @@ Value = instrumentation._Value instruments_holder = [] -# TODO: add testing for new instances class TestInstrumentation(object): def test_track_changes(self, storage): @@ -289,8 +288,7 @@ class TestInstrumentation(object): model_instance_1 = MockModel1(**model_kwargs) model_instance_2 = MockModel2(**model_kwargs) - ctx = MockContext(storage) - instrument = self._track_changes(ctx=ctx, instrumented_new=(MockModel1, )) + instrument = self._track_changes(model=storage, instrumented_new=(MockModel1,)) assert not instrument.tracked_changes storage.mock_model_1.put(model_instance_1) @@ -307,9 +305,9 @@ class TestInstrumentation(object): for key in model_kwargs: assert mock_model_1[key] == model_kwargs[key] == getattr(storage_model1_instance, key) - def _track_changes(self, instrumented_modified=None, ctx=None, instrumented_new=None): + def _track_changes(self, instrumented_modified=None, model=None, instrumented_new=None): instrument = instrumentation.track_changes( - ctx=ctx, + model=model, instrumented={'modified': instrumented_modified or {}, 'new': instrumented_new or {}}) instruments_holder.append(instrument) return instrument
