Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-79-concurrent-storage-modifications ab138d4d7 -> 80eb0fa42


changed exception msgs


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/80eb0fa4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/80eb0fa4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/80eb0fa4

Branch: refs/heads/ARIA-79-concurrent-storage-modifications
Commit: 80eb0fa42b30a2db09b7a0d36659dcf777c83c61
Parents: ab138d4
Author: mxmrlv <[email protected]>
Authored: Thu Feb 16 17:41:43 2017 +0200
Committer: mxmrlv <[email protected]>
Committed: Thu Feb 16 17:41:43 2017 +0200

----------------------------------------------------------------------
 aria/orchestrator/workflows/executor/process.py |  8 ++++--
 aria/storage/instrumentation.py                 | 28 ++++++++++++--------
 2 files changed, 23 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/80eb0fa4/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py 
b/aria/orchestrator/workflows/executor/process.py
index 84f5f58..3122348 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -64,6 +64,9 @@ class ProcessExecutor(base.BaseExecutor):
         super(ProcessExecutor, self).__init__(*args, **kwargs)
         self._plugin_manager = plugin_manager
 
+        self._update_tracked_changes_failed_str = \
+            'Some changes failed writing to storage. For more info refer to 
the log.'
+
         # Optional list of additional directories that should be added to
         # subprocesses python path
         self._python_path = python_path or []
@@ -218,6 +221,7 @@ class ProcessExecutor(base.BaseExecutor):
         try:
             self._apply_tracked_changes(task, request)
         except BaseException as e:
+            e.message += self._update_tracked_changes_failed_str
             self._task_failed(task, exception=e)
         else:
             self._task_succeeded(task)
@@ -227,8 +231,8 @@ class ProcessExecutor(base.BaseExecutor):
         try:
             self._apply_tracked_changes(task, request)
         except BaseException as e:
-            e.message = \
-                '{0} Remote task execution failed due: {1}'.format(str(e), 
request['exception'])
+            e.message += 'Task failed due {0}.'.format(request['exception']) + 
\
+                         self._update_tracked_changes_failed_str
             self._task_failed(task, exception=e)
         else:
             self._task_failed(task, exception=request['exception'])

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/80eb0fa4/aria/storage/instrumentation.py
----------------------------------------------------------------------
diff --git a/aria/storage/instrumentation.py b/aria/storage/instrumentation.py
index b3ca24a..1360f06 100644
--- a/aria/storage/instrumentation.py
+++ b/aria/storage/instrumentation.py
@@ -155,6 +155,10 @@ class _Value(object):
     def __hash__(self):
         return hash((self.initial, self.current))
 
+    @property
+    def dict(self):
+        return {'initial': self.initial, 'current': self.current}.copy()
+
 
 def apply_tracked_changes(tracked_changes, model):
     """Write tracked changes back to the database using provided model storage
@@ -163,12 +167,13 @@ def apply_tracked_changes(tracked_changes, model):
                             returned by calling ``track_changes()``
     :param model: The model storage used to actually apply the changes
     """
-    successfully_updated_instances = dict()
+    successfully_updated_changes = dict()
     try:
         for mapi_name, tracked_instances in tracked_changes.items():
-            successfully_updated_instances[mapi_name] = list()
+            successfully_updated_changes[mapi_name] = dict()
             mapi = getattr(model, mapi_name)
             for instance_id, tracked_attributes in tracked_instances.items():
+                successfully_updated_changes[mapi_name][instance_id] = dict()
                 instance = None
                 for attribute_name, value in tracked_attributes.items():
                     if value.initial != value.current:
@@ -178,16 +183,17 @@ def apply_tracked_changes(tracked_changes, model):
                 if instance:
                     _validate_version_id(instance, mapi)
                     mapi.update(instance)
-                    
successfully_updated_instances[mapi_name].append(instance_id)
-    except BaseException as e:
-        for key, value in successfully_updated_instances.items():
+                    successfully_updated_changes[mapi_name][instance_id] = [
+                        v.dict for v in tracked_attributes.values()]
+    except BaseException:
+        for key, value in successfully_updated_changes.items():
             if not value:
-                del successfully_updated_instances[key]
-        e.message = \
-            'Registering all the changes to the storage has failed. ' \
-            'The instances that were successfully updated : {0} .' \
-            'This was caused by 
{1}.'.format(json.dumps(successfully_updated_instances), str(e))
-        raise e
+                del successfully_updated_changes[key]
+        model.logger.error('Registering all the changes to the storage has 
failed.')
+        model.logger.error('The successful updates were {0}'.format(
+            json.dumps(successfully_updated_changes, indent=4)))
+
+        raise
 
 
 def _validate_version_id(instance, mapi):

Reply via email to