Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-68-commit-runtime-properties 2f0d610ca -> dcfd7096c (forced 
update)


ARIA-68 Update runtime properties during operation


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/dcfd7096
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/dcfd7096
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/dcfd7096

Branch: refs/heads/ARIA-68-commit-runtime-properties
Commit: dcfd7096c0f18c709cd79e7b8418c1dc7fe59d86
Parents: db9ae9c
Author: Dan Kilman <d...@gigaspaces.com>
Authored: Tue Jan 24 16:58:10 2017 +0200
Committer: Dan Kilman <d...@gigaspaces.com>
Committed: Wed Jan 25 12:24:38 2017 +0200

----------------------------------------------------------------------
 aria/orchestrator/workflows/executor/process.py | 87 +++++++++++++-------
 aria/storage/instrumentation.py                 | 18 +++-
 .../test_process_executor_tracked_changes.py    | 62 +++++++++++++-
 tests/storage/test_instrumentation.py           | 38 +++++++++
 4 files changed, 173 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/dcfd7096/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py 
b/aria/orchestrator/workflows/executor/process.py
index 9fa0302..6961b64 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -29,6 +29,7 @@ script_dir = os.path.dirname(__file__)
 if script_dir in sys.path:
     sys.path.remove(script_dir)
 
+import contextlib
 import io
 import threading
 import socket
@@ -136,37 +137,39 @@ class ProcessExecutor(base.BaseExecutor):
         while not self._stopped:
             try:
                 # Accept messages written to the server socket
-                message = self._recv_message()
-                message_type = message['type']
-                if message_type == 'closed':
-                    break
-                task_id = message['task_id']
-                if message_type == 'started':
-                    self._task_started(self._tasks[task_id])
-                elif message_type == 'succeeded':
-                    task = self._remove_task(task_id)
-                    instrumentation.apply_tracked_changes(
-                        tracked_changes=message['tracked_changes'],
-                        model=task.context.model)
-                    self._task_succeeded(task)
-                elif message_type == 'failed':
-                    task = self._remove_task(task_id)
-                    instrumentation.apply_tracked_changes(
-                        tracked_changes=message['tracked_changes'],
-                        model=task.context.model)
-                    self._task_failed(task, exception=message['exception'])
-                else:
-                    raise RuntimeError('Invalid state')
+                with contextlib.closing(self._server_socket.accept()[0]) as 
connection:
+                    message = self._recv_message(connection)
+                    message_type = message['type']
+                    if message_type == 'closed':
+                        break
+                    task_id = message['task_id']
+                    if message_type == 'started':
+                        self._task_started(self._tasks[task_id])
+                    elif message_type == 'apply_tracked_changes':
+                        task = self._tasks[task_id]
+                        instrumentation.apply_tracked_changes(
+                            tracked_changes=message['tracked_changes'],
+                            model=task.context.model)
+                    elif message_type == 'succeeded':
+                        task = self._remove_task(task_id)
+                        instrumentation.apply_tracked_changes(
+                            tracked_changes=message['tracked_changes'],
+                            model=task.context.model)
+                        self._task_succeeded(task)
+                    elif message_type == 'failed':
+                        task = self._remove_task(task_id)
+                        instrumentation.apply_tracked_changes(
+                            tracked_changes=message['tracked_changes'],
+                            model=task.context.model)
+                        self._task_failed(task, exception=message['exception'])
+                    else:
+                        raise RuntimeError('Invalid state')
             except BaseException as e:
                 self.logger.debug('Error in process executor listener: 
{0}'.format(e))
 
-    def _recv_message(self):
-        connection, _ = self._server_socket.accept()
-        try:
-            message_len, = struct.unpack(_INT_FMT, 
self._recv_bytes(connection, _INT_SIZE))
-            return jsonpickle.loads(self._recv_bytes(connection, message_len))
-        finally:
-            connection.close()
+    def _recv_message(self, connection):
+        message_len, = struct.unpack(_INT_FMT, self._recv_bytes(connection, 
_INT_SIZE))
+        return jsonpickle.loads(self._recv_bytes(connection, message_len))
 
     @staticmethod
     def _recv_bytes(connection, count):
@@ -247,6 +250,9 @@ class _Messenger(object):
         """Task failed message"""
         self._send_message(type='failed', tracked_changes=tracked_changes, 
exception=exception)
 
+    def apply_tracked_changes(self, tracked_changes):
+        self._send_message(type='apply_tracked_changes', 
tracked_changes=tracked_changes)
+
     def closed(self):
         """Executor closed message"""
         self._send_message(type='closed')
@@ -263,10 +269,34 @@ class _Messenger(object):
             })
             sock.send(struct.pack(_INT_FMT, len(data)))
             sock.sendall(data)
+            # send message will block until the server side closes the 
connection socket
+            # because we want it to be synchronous
+            sock.recv(1)
         finally:
             sock.close()
 
 
+def _patch_session(ctx, messenger, instrument):
+    # model will be None only in tests that test the executor component 
directly
+    if not ctx.model:
+        return
+
+    session = ctx.model.node_instance._session
+    original_refresh = session.refresh
+
+    def patched_refresh(target):
+        instrument.clear(target)
+        original_refresh(target)
+
+    def patched_commit():
+        messenger.apply_tracked_changes(instrument.tracked_changes)
+        instrument.clear()
+
+    session.autoflush = False
+    session.commit = patched_commit
+    session.refresh = patched_refresh
+
+
 def _main():
     arguments_json_path = sys.argv[1]
     with open(arguments_json_path) as f:
@@ -292,6 +322,7 @@ def _main():
     with instrumentation.track_changes() as instrument:
         try:
             ctx = serialization.operation_context_from_dict(context_dict)
+            _patch_session(ctx=ctx, messenger=messenger, instrument=instrument)
             task_func = imports.load_attribute(operation_mapping)
             aria.install_aria_extensions()
             for decorate in process_executor.decorate():

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/dcfd7096/aria/storage/instrumentation.py
----------------------------------------------------------------------
diff --git a/aria/storage/instrumentation.py b/aria/storage/instrumentation.py
index d71190d..537dbb5 100644
--- a/aria/storage/instrumentation.py
+++ b/aria/storage/instrumentation.py
@@ -75,7 +75,7 @@ class _Instrumentation(object):
 
     def _register_set_attribute_listener(self, instrumented_attribute, 
attribute_type):
         def listener(target, value, *_):
-            mapi_name = api.generate_lower_name(target.__class__)
+            mapi_name = self._mapi_name(target.__class__)
             tracked_instances = self.tracked_changes.setdefault(mapi_name, {})
             tracked_attributes = tracked_instances.setdefault(target.id, {})
             if value is None:
@@ -90,7 +90,7 @@ class _Instrumentation(object):
 
     def _register_instance_listeners(self, instrumented_class, 
instrumented_attributes):
         def listener(target, *_):
-            mapi_name = api.generate_lower_name(instrumented_class)
+            mapi_name = self._mapi_name(instrumented_class)
             tracked_instances = self.tracked_changes.setdefault(mapi_name, {})
             tracked_attributes = tracked_instances.setdefault(target.id, {})
             for attribute_name, attribute_type in 
instrumented_attributes.items():
@@ -108,6 +108,14 @@ class _Instrumentation(object):
             sqlalchemy.event.listen(*listener_args)
             self.listeners.append(listener_args)
 
+    def clear(self, target=None):
+        if target:
+            mapi_name = self._mapi_name(target.__class__)
+            tracked_instances = self.tracked_changes.setdefault(mapi_name, {})
+            tracked_instances.pop(target.id, None)
+        else:
+            self.tracked_changes.clear()
+
     def restore(self):
         """Remove all listeners registered by this instrumentation"""
         for listener_args in self.listeners:
@@ -120,6 +128,10 @@ class _Instrumentation(object):
     def __exit__(self, exc_type, exc_val, exc_tb):
         self.restore()
 
+    @staticmethod
+    def _mapi_name(instrumented_class):
+        return api.generate_lower_name(instrumented_class)
+
 
 class _Value(object):
     # You may wonder why is this a full blown class and not a named tuple. The 
reason is that
@@ -155,3 +167,5 @@ def apply_tracked_changes(tracked_changes, model):
                     if not instance:
                         instance = mapi.get(instance_id)
                     setattr(instance, attribute_name, value.current)
+            if instance:
+                mapi.update(instance)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/dcfd7096/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
----------------------------------------------------------------------
diff --git 
a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
 
b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
index 1564292..bd1fa96 100644
--- 
a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
+++ 
b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
@@ -13,6 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import copy
+
 import pytest
 
 from aria.orchestrator.workflows import api
@@ -52,18 +54,51 @@ def _update_runtime_properties(context):
     context.node_instance.runtime_properties.update(_TEST_RUNTIME_PROPERTIES)
 
 
-def _run_workflow(context, executor, op_func):
+def test_refresh_state_of_tracked_attributes(context, executor):
+    out = _run_workflow(context=context, executor=executor, 
op_func=_mock_refreshing_operation)
+    assert out['initial'] == out['after_refresh']
+    assert out['initial'] != out['after_change']
+
+
+def test_apply_tracked_changes_during_an_operation(context, executor):
+    inputs = {
+        'committed': {'some': 'new', 'properties': 'right here'},
+        'changed_but_refreshed': {'some': 'newer', 'properties': 'right there'}
+    }
+
+    expected_initial = context.model.node_instance.get_by_name(
+        mock.models.DEPENDENCY_NODE_INSTANCE_NAME).runtime_properties
+
+    out = _run_workflow(context=context, executor=executor, 
op_func=_mock_updating_operation,
+                        inputs=inputs)
+
+    expected_after_update = expected_initial.copy()
+    expected_after_update.update(inputs['committed'])
+    expected_after_change = expected_after_update.copy()
+    expected_after_change.update(inputs['changed_but_refreshed'])
+    expected_after_refresh = expected_after_update
+
+    assert out['initial'] == expected_initial
+    assert out['after_update'] == expected_after_update
+    assert out['after_change'] == expected_after_change
+    assert out['after_refresh'] == expected_after_refresh
+
+
+def _run_workflow(context, executor, op_func, inputs=None):
     @workflow
     def mock_workflow(ctx, graph):
         node_instance = ctx.model.node_instance.get_by_name(
             mock.models.DEPENDENCY_NODE_INSTANCE_NAME)
         node_instance.node.operations['test.op'] = {'operation': 
_operation_mapping(op_func)}
-        task = api.task.OperationTask.node_instance(instance=node_instance, 
name='test.op')
+        task = api.task.OperationTask.node_instance(instance=node_instance, 
name='test.op',
+                                                    inputs=inputs or {})
         graph.add_tasks(task)
         return graph
     graph = mock_workflow(ctx=context)  # pylint: 
disable=no-value-for-parameter
     eng = engine.Engine(executor=executor, workflow_context=context, 
tasks_graph=graph)
     eng.execute()
+    return context.model.node_instance.get_by_name(
+        
mock.models.DEPENDENCY_NODE_INSTANCE_NAME).runtime_properties.get('out')
 
 
 @operation
@@ -77,6 +112,29 @@ def _mock_fail_operation(ctx):
     raise RuntimeError
 
 
+@operation
+def _mock_refreshing_operation(ctx):
+    out = {'initial': copy.deepcopy(ctx.node_instance.runtime_properties)}
+    ctx.node_instance.runtime_properties.update({'some': 'new', 'properties': 
'right here'})
+    out['after_change'] = copy.deepcopy(ctx.node_instance.runtime_properties)
+    ctx.model.node_instance.refresh(ctx.node_instance)
+    out['after_refresh'] = copy.deepcopy(ctx.node_instance.runtime_properties)
+    ctx.node_instance.runtime_properties['out'] = out
+
+
+@operation
+def _mock_updating_operation(ctx, committed, changed_but_refreshed):
+    out = {'initial': copy.deepcopy(ctx.node_instance.runtime_properties)}
+    ctx.node_instance.runtime_properties.update(committed)
+    ctx.model.node_instance.update(ctx.node_instance)
+    out['after_update'] = copy.deepcopy(ctx.node_instance.runtime_properties)
+    ctx.node_instance.runtime_properties.update(changed_but_refreshed)
+    out['after_change'] = copy.deepcopy(ctx.node_instance.runtime_properties)
+    ctx.model.node_instance.refresh(ctx.node_instance)
+    out['after_refresh'] = copy.deepcopy(ctx.node_instance.runtime_properties)
+    ctx.node_instance.runtime_properties['out'] = out
+
+
 def _operation_mapping(func):
     return '{name}.{func.__name__}'.format(name=__name__, func=func)
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/dcfd7096/tests/storage/test_instrumentation.py
----------------------------------------------------------------------
diff --git a/tests/storage/test_instrumentation.py 
b/tests/storage/test_instrumentation.py
index 8e7f9aa..8b826e9 100644
--- a/tests/storage/test_instrumentation.py
+++ b/tests/storage/test_instrumentation.py
@@ -232,6 +232,44 @@ class TestInstrumentation(object):
         assert instance2_1.dict1 == {'initial': 'value', 'new': 'value'}
         assert instance2_2.list1 == ['initial', 'new_value']
 
+    def test_clear_instance(self, storage):
+        instance1 = MockModel1(name='name1')
+        instance2 = MockModel1(name='name2')
+        for instance in [instance1, instance2]:
+            storage.mock_model_1.put(instance)
+        instrument = self._track_changes({MockModel1.dict1: dict})
+        instance1.dict1 = {'new': 'value'}
+        instance2.dict1 = {'new2': 'value2'}
+        assert instrument.tracked_changes == {
+            'mock_model_1': {
+                instance1.id: {'dict1': Value(STUB, {'new': 'value'})},
+                instance2.id: {'dict1': Value(STUB, {'new2': 'value2'})}
+            }
+        }
+        instrument.clear(instance1)
+        assert instrument.tracked_changes == {
+            'mock_model_1': {
+                instance2.id: {'dict1': Value(STUB, {'new2': 'value2'})}
+            }
+        }
+
+    def test_clear_all(self, storage):
+        instance1 = MockModel1(name='name1')
+        instance2 = MockModel1(name='name2')
+        for instance in [instance1, instance2]:
+            storage.mock_model_1.put(instance)
+        instrument = self._track_changes({MockModel1.dict1: dict})
+        instance1.dict1 = {'new': 'value'}
+        instance2.dict1 = {'new2': 'value2'}
+        assert instrument.tracked_changes == {
+            'mock_model_1': {
+                instance1.id: {'dict1': Value(STUB, {'new': 'value'})},
+                instance2.id: {'dict1': Value(STUB, {'new2': 'value2'})}
+            }
+        }
+        instrument.clear()
+        assert instrument.tracked_changes == {}
+
     def _track_changes(self, instrumented):
         instrument = instrumentation.track_changes(instrumented)
         instruments_holder.append(instrument)

Reply via email to