Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-68-commit-runtime-properties 5a7e5a205 -> 201ef1234 (forced 
update)


ARIA-68 Update runtime properties during operation


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/201ef123
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/201ef123
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/201ef123

Branch: refs/heads/ARIA-68-commit-runtime-properties
Commit: 201ef1234ee6d8aeec3d71479cbc16d245d681ae
Parents: 0382b22
Author: Dan Kilman <[email protected]>
Authored: Tue Jan 24 16:58:10 2017 +0200
Committer: Dan Kilman <[email protected]>
Committed: Tue Jan 24 17:14:12 2017 +0200

----------------------------------------------------------------------
 aria/orchestrator/workflows/executor/process.py | 79 +++++++++++++-------
 aria/storage/instrumentation.py                 | 17 ++++-
 2 files changed, 66 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/201ef123/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py 
b/aria/orchestrator/workflows/executor/process.py
index 9fa0302..e2fdbf2 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -29,6 +29,7 @@ script_dir = os.path.dirname(__file__)
 if script_dir in sys.path:
     sys.path.remove(script_dir)
 
+import contextlib
 import io
 import threading
 import socket
@@ -136,37 +137,39 @@ class ProcessExecutor(base.BaseExecutor):
         while not self._stopped:
             try:
                 # Accept messages written to the server socket
-                message = self._recv_message()
-                message_type = message['type']
-                if message_type == 'closed':
-                    break
-                task_id = message['task_id']
-                if message_type == 'started':
-                    self._task_started(self._tasks[task_id])
-                elif message_type == 'succeeded':
-                    task = self._remove_task(task_id)
-                    instrumentation.apply_tracked_changes(
-                        tracked_changes=message['tracked_changes'],
-                        model=task.context.model)
-                    self._task_succeeded(task)
-                elif message_type == 'failed':
-                    task = self._remove_task(task_id)
-                    instrumentation.apply_tracked_changes(
-                        tracked_changes=message['tracked_changes'],
-                        model=task.context.model)
-                    self._task_failed(task, exception=message['exception'])
-                else:
-                    raise RuntimeError('Invalid state')
+                with contextlib.closing(self._server_socket.accept()[0]) as 
connection:
+                    message = self._recv_message(connection)
+                    message_type = message['type']
+                    if message_type == 'closed':
+                        break
+                    task_id = message['task_id']
+                    if message_type == 'started':
+                        self._task_started(self._tasks[task_id])
+                    elif message_type == 'apply_tracked_changes':
+                        task = self._tasks[task_id]
+                        instrumentation.apply_tracked_changes(
+                            tracked_changes=message['tracked_changes'],
+                            model=task.context.model)
+                    elif message_type == 'succeeded':
+                        task = self._remove_task(task_id)
+                        instrumentation.apply_tracked_changes(
+                            tracked_changes=message['tracked_changes'],
+                            model=task.context.model)
+                        self._task_succeeded(task)
+                    elif message_type == 'failed':
+                        task = self._remove_task(task_id)
+                        instrumentation.apply_tracked_changes(
+                            tracked_changes=message['tracked_changes'],
+                            model=task.context.model)
+                        self._task_failed(task, exception=message['exception'])
+                    else:
+                        raise RuntimeError('Invalid state')
             except BaseException as e:
                 self.logger.debug('Error in process executor listener: 
{0}'.format(e))
 
-    def _recv_message(self):
-        connection, _ = self._server_socket.accept()
-        try:
-            message_len, = struct.unpack(_INT_FMT, 
self._recv_bytes(connection, _INT_SIZE))
-            return jsonpickle.loads(self._recv_bytes(connection, message_len))
-        finally:
-            connection.close()
+    def _recv_message(self, connection):
+        message_len, = struct.unpack(_INT_FMT, self._recv_bytes(connection, 
_INT_SIZE))
+        return jsonpickle.loads(self._recv_bytes(connection, message_len))
 
     @staticmethod
     def _recv_bytes(connection, count):
@@ -247,6 +250,9 @@ class _Messenger(object):
         """Task failed message"""
         self._send_message(type='failed', tracked_changes=tracked_changes, 
exception=exception)
 
+    def apply_tracked_changes(self, tracked_changes):
+        self._send_message(type='apply_tracked_changes', 
tracked_changes=tracked_changes)
+
     def closed(self):
         """Executor closed message"""
         self._send_message(type='closed')
@@ -263,6 +269,7 @@ class _Messenger(object):
             })
             sock.send(struct.pack(_INT_FMT, len(data)))
             sock.sendall(data)
+            sock.recv(1)
         finally:
             sock.close()
 
@@ -292,6 +299,22 @@ def _main():
     with instrumentation.track_changes() as instrument:
         try:
             ctx = serialization.operation_context_from_dict(context_dict)
+
+            session = ctx.model.node_instance._session
+            original_refresh = session.refresh
+
+            def patched_refresh(target):
+                instrument.clear(target)
+                original_refresh(target)
+
+            def patched_commit():
+                messenger.apply_tracked_changes(instrument.tracked_changes)
+                instrument.clear()
+
+            session.autoflush = False
+            session.commit = patched_commit
+            session.refresh = patched_refresh
+
             task_func = imports.load_attribute(operation_mapping)
             aria.install_aria_extensions()
             for decorate in process_executor.decorate():

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/201ef123/aria/storage/instrumentation.py
----------------------------------------------------------------------
diff --git a/aria/storage/instrumentation.py b/aria/storage/instrumentation.py
index d71190d..d4fa715 100644
--- a/aria/storage/instrumentation.py
+++ b/aria/storage/instrumentation.py
@@ -75,7 +75,7 @@ class _Instrumentation(object):
 
     def _register_set_attribute_listener(self, instrumented_attribute, 
attribute_type):
         def listener(target, value, *_):
-            mapi_name = api.generate_lower_name(target.__class__)
+            mapi_name = self._mapi_name(target.__class__)
             tracked_instances = self.tracked_changes.setdefault(mapi_name, {})
             tracked_attributes = tracked_instances.setdefault(target.id, {})
             if value is None:
@@ -90,7 +90,7 @@ class _Instrumentation(object):
 
     def _register_instance_listeners(self, instrumented_class, 
instrumented_attributes):
         def listener(target, *_):
-            mapi_name = api.generate_lower_name(instrumented_class)
+            mapi_name = self._mapi_name(instrumented_class)
             tracked_instances = self.tracked_changes.setdefault(mapi_name, {})
             tracked_attributes = tracked_instances.setdefault(target.id, {})
             for attribute_name, attribute_type in 
instrumented_attributes.items():
@@ -108,6 +108,14 @@ class _Instrumentation(object):
             sqlalchemy.event.listen(*listener_args)
             self.listeners.append(listener_args)
 
+    def clear(self, target=None):
+        if target:
+            mapi_name = self._mapi_name(target.__class__)
+            tracked_instances = self.tracked_changes.setdefault(mapi_name, {})
+            tracked_instances.pop(target.id, None)
+        else:
+            self.tracked_changes.clear()
+
     def restore(self):
         """Remove all listeners registered by this instrumentation"""
         for listener_args in self.listeners:
@@ -120,6 +128,10 @@ class _Instrumentation(object):
     def __exit__(self, exc_type, exc_val, exc_tb):
         self.restore()
 
+    @staticmethod
+    def _mapi_name(instrumented_class):
+        return api.generate_lower_name(instrumented_class)
+
 
 class _Value(object):
     # You may wonder why is this a full blown class and not a named tuple. The 
reason is that
@@ -155,3 +167,4 @@ def apply_tracked_changes(tracked_changes, model):
                     if not instance:
                         instance = mapi.get(instance_id)
                     setattr(instance, attribute_name, value.current)
+    model.node_instance._session.commit()

Reply via email to