Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-79-concurrent-storage-modifications ab138d4d7 -> 80eb0fa42
changed exception msgs Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/80eb0fa4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/80eb0fa4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/80eb0fa4 Branch: refs/heads/ARIA-79-concurrent-storage-modifications Commit: 80eb0fa42b30a2db09b7a0d36659dcf777c83c61 Parents: ab138d4 Author: mxmrlv <[email protected]> Authored: Thu Feb 16 17:41:43 2017 +0200 Committer: mxmrlv <[email protected]> Committed: Thu Feb 16 17:41:43 2017 +0200 ---------------------------------------------------------------------- aria/orchestrator/workflows/executor/process.py | 8 ++++-- aria/storage/instrumentation.py | 28 ++++++++++++-------- 2 files changed, 23 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/80eb0fa4/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index 84f5f58..3122348 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -64,6 +64,9 @@ class ProcessExecutor(base.BaseExecutor): super(ProcessExecutor, self).__init__(*args, **kwargs) self._plugin_manager = plugin_manager + self._update_tracked_changes_failed_str = \ + 'Some changes failed writing to storage. For more info refer to the log.' + # Optional list of additional directories that should be added to # subprocesses python path self._python_path = python_path or [] @@ -218,6 +221,7 @@ class ProcessExecutor(base.BaseExecutor): try: self._apply_tracked_changes(task, request) except BaseException as e: + e.message += self._update_tracked_changes_failed_str self._task_failed(task, exception=e) else: self._task_succeeded(task) @@ -227,8 +231,8 @@ class ProcessExecutor(base.BaseExecutor): try: self._apply_tracked_changes(task, request) except BaseException as e: - e.message = \ - '{0} Remote task execution failed due: {1}'.format(str(e), request['exception']) + e.message += 'Task failed due {0}.'.format(request['exception']) + \ + self._update_tracked_changes_failed_str self._task_failed(task, exception=e) else: self._task_failed(task, exception=request['exception']) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/80eb0fa4/aria/storage/instrumentation.py ---------------------------------------------------------------------- diff --git a/aria/storage/instrumentation.py b/aria/storage/instrumentation.py index b3ca24a..1360f06 100644 --- a/aria/storage/instrumentation.py +++ b/aria/storage/instrumentation.py @@ -155,6 +155,10 @@ class _Value(object): def __hash__(self): return hash((self.initial, self.current)) + @property + def dict(self): + return {'initial': self.initial, 'current': self.current}.copy() + def apply_tracked_changes(tracked_changes, model): """Write tracked changes back to the database using provided model storage @@ -163,12 +167,13 @@ def apply_tracked_changes(tracked_changes, model): returned by calling ``track_changes()`` :param model: The model storage used to actually apply the changes """ - successfully_updated_instances = dict() + successfully_updated_changes = dict() try: for mapi_name, tracked_instances in tracked_changes.items(): - successfully_updated_instances[mapi_name] = list() + successfully_updated_changes[mapi_name] = dict() mapi = getattr(model, mapi_name) for instance_id, tracked_attributes in tracked_instances.items(): + successfully_updated_changes[mapi_name][instance_id] = dict() instance = None for attribute_name, value in tracked_attributes.items(): if value.initial != value.current: @@ -178,16 +183,17 @@ def apply_tracked_changes(tracked_changes, model): if instance: _validate_version_id(instance, mapi) mapi.update(instance) - successfully_updated_instances[mapi_name].append(instance_id) - except BaseException as e: - for key, value in successfully_updated_instances.items(): + successfully_updated_changes[mapi_name][instance_id] = [ + v.dict for v in tracked_attributes.values()] + except BaseException: + for key, value in successfully_updated_changes.items(): if not value: - del successfully_updated_instances[key] - e.message = \ - 'Registering all the changes to the storage has failed. ' \ - 'The instances that were successfully updated : {0} .' \ - 'This was caused by {1}.'.format(json.dumps(successfully_updated_instances), str(e)) - raise e + del successfully_updated_changes[key] + model.logger.error('Registering all the changes to the storage has failed.') + model.logger.error('The successful updates were {0}'.format( + json.dumps(successfully_updated_changes, indent=4))) + + raise def _validate_version_id(instance, mapi):
