Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-79-concurrent-storage-modifications 80eb0fa42 -> f0e6e6625 (forced update)
changed exception msgs Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/f0e6e662 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/f0e6e662 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/f0e6e662 Branch: refs/heads/ARIA-79-concurrent-storage-modifications Commit: f0e6e6625c8a5ae286e2d4db2d559731944c1440 Parents: ab138d4 Author: mxmrlv <[email protected]> Authored: Thu Feb 16 17:41:43 2017 +0200 Committer: mxmrlv <[email protected]> Committed: Thu Feb 16 17:49:32 2017 +0200 ---------------------------------------------------------------------- aria/orchestrator/workflows/executor/process.py | 7 +++-- aria/storage/instrumentation.py | 29 ++++++++++++-------- 2 files changed, 23 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f0e6e662/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index 84f5f58..75bbbce 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -53,6 +53,8 @@ _IS_WIN = os.name == 'nt' _INT_FMT = 'I' _INT_SIZE = struct.calcsize(_INT_FMT) +UPDATE_TRACKED_CHANGES_FAILED_STR = \ + 'Some changes failed writing to storage. For more info refer to the log.' class ProcessExecutor(base.BaseExecutor): @@ -218,6 +220,7 @@ class ProcessExecutor(base.BaseExecutor): try: self._apply_tracked_changes(task, request) except BaseException as e: + e.message += UPDATE_TRACKED_CHANGES_FAILED_STR self._task_failed(task, exception=e) else: self._task_succeeded(task) @@ -227,8 +230,8 @@ class ProcessExecutor(base.BaseExecutor): try: self._apply_tracked_changes(task, request) except BaseException as e: - e.message = \ - '{0} Remote task execution failed due: {1}'.format(str(e), request['exception']) + e.message += 'Task failed due to {0}.'.format(request['exception']) + \ + UPDATE_TRACKED_CHANGES_FAILED_STR self._task_failed(task, exception=e) else: self._task_failed(task, exception=request['exception']) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f0e6e662/aria/storage/instrumentation.py ---------------------------------------------------------------------- diff --git a/aria/storage/instrumentation.py b/aria/storage/instrumentation.py index b3ca24a..8fb9d82 100644 --- a/aria/storage/instrumentation.py +++ b/aria/storage/instrumentation.py @@ -155,6 +155,10 @@ class _Value(object): def __hash__(self): return hash((self.initial, self.current)) + @property + def dict(self): + return {'initial': self.initial, 'current': self.current}.copy() + def apply_tracked_changes(tracked_changes, model): """Write tracked changes back to the database using provided model storage @@ -163,12 +167,13 @@ def apply_tracked_changes(tracked_changes, model): returned by calling ``track_changes()`` :param model: The model storage used to actually apply the changes """ - successfully_updated_instances = dict() + successfully_updated_changes = dict() try: for mapi_name, tracked_instances in tracked_changes.items(): - successfully_updated_instances[mapi_name] = list() + successfully_updated_changes[mapi_name] = dict() mapi = getattr(model, mapi_name) for instance_id, tracked_attributes in tracked_instances.items(): + successfully_updated_changes[mapi_name][instance_id] = dict() instance = None for attribute_name, value in tracked_attributes.items(): if value.initial != value.current: @@ -178,16 +183,18 @@ def apply_tracked_changes(tracked_changes, model): if instance: _validate_version_id(instance, mapi) mapi.update(instance) - successfully_updated_instances[mapi_name].append(instance_id) - except BaseException as e: - for key, value in successfully_updated_instances.items(): + successfully_updated_changes[mapi_name][instance_id] = [ + v.dict for v in tracked_attributes.values()] + except BaseException: + for key, value in successfully_updated_changes.items(): if not value: - del successfully_updated_instances[key] - e.message = \ - 'Registering all the changes to the storage has failed. ' \ - 'The instances that were successfully updated : {0} .' \ - 'This was caused by {1}.'.format(json.dumps(successfully_updated_instances), str(e)) - raise e + del successfully_updated_changes[key] + model.logger.error( + 'Registering all the changes to the storage has failed. \n' + 'The successful updates were: \n ' + '{0}'.format(json.dumps(successfully_updated_changes, indent=4))) + + raise def _validate_version_id(instance, mapi):
