Repository: incubator-ariatosca
Updated Branches:
  refs/heads/runtime_props_to_attr [created] 87dad5511


wip


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/87dad551
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/87dad551
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/87dad551

Branch: refs/heads/runtime_props_to_attr
Commit: 87dad55110c75f3853cc1da4cd0e3e82d58a1678
Parents: fdd57c4
Author: max-orlov <[email protected]>
Authored: Sun May 14 22:38:39 2017 +0300
Committer: max-orlov <[email protected]>
Committed: Sun May 14 22:38:39 2017 +0300

----------------------------------------------------------------------
 aria/modeling/service_common.py                 |  9 +-
 aria/orchestrator/context/operation.py          | 68 ++++++++++++++
 aria/orchestrator/workflows/core/engine.py      |  1 -
 aria/orchestrator/workflows/executor/process.py |  7 +-
 aria/storage/instrumentation.py                 | 96 ++++++++++++++------
 tests/helpers.py                                |  7 ++
 tests/orchestrator/context/test_operation.py    | 74 ++++++++++++++-
 7 files changed, 224 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/87dad551/aria/modeling/service_common.py
----------------------------------------------------------------------
diff --git a/aria/modeling/service_common.py b/aria/modeling/service_common.py
index e9c96a4..ef19c8e 100644
--- a/aria/modeling/service_common.py
+++ b/aria/modeling/service_common.py
@@ -218,14 +218,13 @@ class ParameterBase(TemplateModelMixin, 
caching.HasCachedMethods):
         :type description: basestring
         """
 
-        from . import models
         type_name = canonical_type_name(value)
         if type_name is None:
             type_name = full_type_name(value)
-        return models.Parameter(name=name, # pylint: 
disable=unexpected-keyword-arg
-                                type_name=type_name,
-                                value=value,
-                                description=description)
+        return cls(name=name, # pylint: disable=unexpected-keyword-arg
+                   type_name=type_name,
+                   value=value,
+                   description=description)
 
 
 class TypeBase(InstanceModelMixin):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/87dad551/aria/orchestrator/context/operation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/operation.py 
b/aria/orchestrator/context/operation.py
index 68a02aa..2c3f173 100644
--- a/aria/orchestrator/context/operation.py
+++ b/aria/orchestrator/context/operation.py
@@ -24,6 +24,68 @@ from aria.utils import file
 from .common import BaseContext
 
 
+class _DecorateAttributes(object):
+
+    class _Attributes(object):
+        def __init__(self, model, actor):
+            self._model = model
+            self._actor = actor
+            self._attributes = actor.attributes
+            self._attr_cls = self._model.parameter.model_cls
+
+        def __getitem__(self, item):
+            return self._attributes[item].value
+
+        def __setitem__(self, key, value):
+            if key in self._attributes:
+                self._attributes[key].value = value
+                self._model.parameter.update(self._attributes[key])
+            else:
+                attr = self._attr_cls.wrap(key, value)
+                self._attributes[key] = attr
+                self._model.parameter.put(attr)
+
+        def update(self, dict_=None, **kwargs):
+            if dict_:
+                for key, value in dict_.items():
+                    self[key] = value
+
+            for key, value in kwargs.items():
+                self[key] = value
+
+        def keys(self):
+            for attr in self._attributes.values():
+                yield attr.unwrap()[0]
+
+        def values(self):
+            for attr in self._attributes.values():
+                yield attr.unwrap()[1]
+
+        def items(self):
+            for attr in self._attributes.values():
+                yield attr.unwrap()
+
+        def __iter__(self):
+            for attr in self._attributes.values():
+                yield attr.unwrap()[0]
+
+    def __init__(self, func):
+        self._func = func
+
+    def __getattr__(self, item):
+        try:
+            return getattr(self._actor, item)
+        except AttributeError:
+            return super(_DecorateAttributes, self).__getattribute__(item)
+
+    def __call__(self, *args, **kwargs):
+        func_self = args[0]
+        actor = self._func(*args, **kwargs)
+        model = func_self.model
+        self.attributes = self._Attributes(model, actor)
+        return self
+
+
 class BaseOperationContext(BaseContext):
     """
     Context object used during operation creation and execution
@@ -105,6 +167,7 @@ class NodeOperationContext(BaseOperationContext):
     """
 
     @property
+    @_DecorateAttributes
     def node_template(self):
         """
         the node of the current operation
@@ -113,6 +176,7 @@ class NodeOperationContext(BaseOperationContext):
         return self.node.node_template
 
     @property
+    @_DecorateAttributes
     def node(self):
         """
         The node instance of the current operation
@@ -127,6 +191,7 @@ class RelationshipOperationContext(BaseOperationContext):
     """
 
     @property
+    @_DecorateAttributes
     def source_node_template(self):
         """
         The source node
@@ -135,6 +200,7 @@ class RelationshipOperationContext(BaseOperationContext):
         return self.source_node.node_template
 
     @property
+    @_DecorateAttributes
     def source_node(self):
         """
         The source node instance
@@ -143,6 +209,7 @@ class RelationshipOperationContext(BaseOperationContext):
         return self.relationship.source_node
 
     @property
+    @_DecorateAttributes
     def target_node_template(self):
         """
         The target node
@@ -151,6 +218,7 @@ class RelationshipOperationContext(BaseOperationContext):
         return self.target_node.node_template
 
     @property
+    @_DecorateAttributes
     def target_node(self):
         """
         The target node instance

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/87dad551/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py 
b/aria/orchestrator/workflows/core/engine.py
index 561265c..3a96804 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -69,7 +69,6 @@ class Engine(logger.LoggerMixin):
             else:
                 events.on_success_workflow_signal.send(self._workflow_context)
         except BaseException as e:
-
             events.on_failure_workflow_signal.send(self._workflow_context, 
exception=e)
             raise
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/87dad551/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py 
b/aria/orchestrator/workflows/executor/process.py
index 824c4e1..c3962ed 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -334,7 +334,8 @@ def _patch_ctx(ctx, messenger, instrument):
         original_refresh(target)
 
     def patched_commit():
-        messenger.apply_tracked_changes(instrument.tracked_changes, 
instrument.new_instances)
+        messenger.apply_tracked_changes(instrument.tracked_changes,
+                                        instrument.new_instances_as_dict)
         instrument.expunge_session()
         instrument.clear()
 
@@ -388,11 +389,11 @@ def _main():
                 task_func = decorate(task_func)
             task_func(ctx=ctx, **operation_inputs)
             messenger.succeeded(tracked_changes=instrument.tracked_changes,
-                                new_instances=instrument.new_instances)
+                                new_instances=instrument.new_instances_as_dict)
         except BaseException as e:
             messenger.failed(exception=e,
                              tracked_changes=instrument.tracked_changes,
-                             new_instances=instrument.new_instances)
+                             new_instances=instrument.new_instances_as_dict)
         finally:
             instrument.expunge_session()
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/87dad551/aria/storage/instrumentation.py
----------------------------------------------------------------------
diff --git a/aria/storage/instrumentation.py b/aria/storage/instrumentation.py
index 390f933..701c058 100644
--- a/aria/storage/instrumentation.py
+++ b/aria/storage/instrumentation.py
@@ -25,13 +25,18 @@ from ..storage.exceptions import StorageError
 
 _VERSION_ID_COL = 'version'
 _STUB = object()
+_Collection = type('_Collection', (object, ), {})
+
+collection = _Collection()
 _INSTRUMENTED = {
     'modified': {
-        _models.Node.runtime_properties: dict,
         _models.Node.state: str,
         _models.Task.status: str,
+        _models.Node.attributes: collection,
+        # TODO: add support for pickled type
+        # _models.Parameter._value: some_type
     },
-    'new': (_models.Log, )
+    'new': (_models.Log, ),
 
 }
 
@@ -70,7 +75,7 @@ class _Instrumentation(object):
 
     def __init__(self, model, instrumented):
         self.tracked_changes = {}
-        self.new_instances = {}
+        self.new_instances_as_dict = {}
         self.listeners = []
         self._instances_to_expunge = []
         self._model = model
@@ -93,25 +98,26 @@ class _Instrumentation(object):
 
     def _track_changes(self, instrumented):
         instrumented_attribute_classes = {}
+        # Track any newly created instances.
+        for instrumented_class in instrumented.get('new', []):
+            self._register_new_instance_listener(instrumented_class)
+
         # Track any newly-set attributes.
         for instrumented_attribute, attribute_type in 
instrumented.get('modified', {}).items():
-            self._register_set_attribute_listener(
-                instrumented_attribute=instrumented_attribute,
-                attribute_type=attribute_type)
-            instrumented_class = instrumented_attribute.parent.entity
-            instrumented_class_attributes = 
instrumented_attribute_classes.setdefault(
-                instrumented_class, {})
-            instrumented_class_attributes[instrumented_attribute.key] = 
attribute_type
+            
self._register_attribute_listener(instrumented_attribute=instrumented_attribute,
+                                              attribute_type=attribute_type)
+            # TODO: Revisit this, why not?
+            if not isinstance(attribute_type, _Collection):
+                instrumented_class = instrumented_attribute.parent.entity
+                instrumented_class_attributes = 
instrumented_attribute_classes.setdefault(
+                    instrumented_class, {})
+                instrumented_class_attributes[instrumented_attribute.key] = 
attribute_type
 
         # Track any global instance update such as 'refresh' or 'load'
         for instrumented_class, instrumented_attributes in 
instrumented_attribute_classes.items():
             
self._register_instance_listeners(instrumented_class=instrumented_class,
                                               
instrumented_attributes=instrumented_attributes)
 
-        # Track any newly created instances.
-        for instrumented_class in instrumented.get('new', {}):
-            self._register_new_instance_listener(instrumented_class)
-
     def _register_new_instance_listener(self, instrumented_class):
         if self._model is None:
             raise StorageError("In order to keep track of new instances, a ctx 
is needed")
@@ -120,7 +126,7 @@ class _Instrumentation(object):
             if not isinstance(instance, instrumented_class):
                 return
             self._instances_to_expunge.append(instance)
-            tracked_instances = 
self.new_instances.setdefault(instance.__modelname__, {})
+            tracked_instances = 
self.new_instances_as_dict.setdefault(instance.__modelname__, {})
             tracked_attributes = 
tracked_instances.setdefault(self._new_instance_id, {})
             instance_as_dict = instance.to_dict()
             instance_as_dict.update((k, getattr(instance, k))
@@ -131,6 +137,28 @@ class _Instrumentation(object):
         sqlalchemy.event.listen(*listener_args)
         self.listeners.append(listener_args)
 
+    def _register_attribute_listener(self, instrumented_attribute, 
attribute_type):
+        # Track and newly created instances that are a part of a collection.
+        if isinstance(attribute_type, _Collection):
+            return 
self._register_append_to_attribute_listener(instrumented_attribute)
+        else:
+            return 
self._register_set_attribute_listener(instrumented_attribute, attribute_type)
+
+    def _register_append_to_attribute_listener(self, collection_attr):
+        def listener(target, value, initiator):
+            tracked_instances = 
self.tracked_changes.setdefault(target.__modelname__, {})
+            tracked_attributes = tracked_instances.setdefault(target.id, {})
+            collection = tracked_attributes.setdefault(initiator.key, [])
+            instance_as_dict = value.to_dict()
+            instance_as_dict.update((k, getattr(value, k))
+                                    for k in getattr(value, 
'__private_fields__', []))
+            instance_as_dict['_MODEL_CLS'] = value.__modelname__
+            collection.append(instance_as_dict)
+
+        listener_args = (collection_attr, 'append', listener)
+        sqlalchemy.event.listen(*listener_args)
+        self.listeners.append(listener_args)
+
     def _register_set_attribute_listener(self, instrumented_attribute, 
attribute_type):
         def listener(target, value, *_):
             mapi_name = target.__modelname__
@@ -179,7 +207,7 @@ class _Instrumentation(object):
         else:
             self.tracked_changes.clear()
 
-        self.new_instances.clear()
+        self.new_instances_as_dict.clear()
         self._instances_to_expunge = []
 
     def restore(self):
@@ -230,27 +258,39 @@ def apply_tracked_changes(tracked_changes, new_instances, 
model):
         for mapi_name, tracked_instances in tracked_changes.items():
             successfully_updated_changes[mapi_name] = dict()
             mapi = getattr(model, mapi_name)
+
+            # Handle new instances
+            for mapi_name, new_instance in new_instances.items():
+                successfully_updated_changes[mapi_name] = dict()
+                mapi = getattr(model, mapi_name)
+                for tmp_id, new_instance_kwargs in new_instance.items():
+                    instance = mapi.model_cls(**new_instance_kwargs)
+                    mapi.put(instance)
+                    successfully_updated_changes[mapi_name][instance.id] = 
new_instance_kwargs
+                    new_instance[tmp_id] = instance
+
             for instance_id, tracked_attributes in tracked_instances.items():
                 successfully_updated_changes[mapi_name][instance_id] = dict()
                 instance = None
                 for attribute_name, value in tracked_attributes.items():
-                    if value.initial != value.current:
-                        instance = instance or mapi.get(instance_id)
+                    instance = instance or mapi.get(instance_id)
+                    if isinstance(value, list):
+                        # The changes are new item to a collection
+                        for item in value:
+                            model_name = item.pop('_MODEL_CLS')
+                            attr_model = getattr(model, model_name).model_cls
+                            new_attr = attr_model(**item)
+                            getattr(instance, attribute_name)[new_attr] = 
new_attr
+                    elif value.initial != value.current:
+                        # scalar attribute
                         setattr(instance, attribute_name, value.current)
                 if instance:
                     _validate_version_id(instance, mapi)
                     mapi.update(instance)
-                    successfully_updated_changes[mapi_name][instance_id] = [
-                        v.dict for v in tracked_attributes.values()]
+                    # TODO: reinstate this
+                    # successfully_updated_changes[mapi_name][instance_id] = [
+                    #     v.dict for v in tracked_attributes.values()]
 
-        # Handle new instances
-        for mapi_name, new_instance in new_instances.items():
-            successfully_updated_changes[mapi_name] = dict()
-            mapi = getattr(model, mapi_name)
-            for new_instance_kwargs in new_instance.values():
-                instance = mapi.model_cls(**new_instance_kwargs)
-                mapi.put(instance)
-                successfully_updated_changes[mapi_name][instance.id] = 
new_instance_kwargs
     except BaseException:
         for key, value in successfully_updated_changes.items():
             if not value:

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/87dad551/tests/helpers.py
----------------------------------------------------------------------
diff --git a/tests/helpers.py b/tests/helpers.py
index 3c3efc9..133e33b 100644
--- a/tests/helpers.py
+++ b/tests/helpers.py
@@ -67,6 +67,13 @@ class FilesystemDataHolder(object):
         self._dump(dict_)
         return return_value
 
+    def update(self, dict_=None, **kwargs):
+        current_dict = self._load()
+        if dict_:
+            current_dict.update(dict_)
+        current_dict.update(**kwargs)
+        self._dump(current_dict)
+
     @property
     def path(self):
         return self._path

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/87dad551/tests/orchestrator/context/test_operation.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_operation.py 
b/tests/orchestrator/context/test_operation.py
index cdeb5fa..14868e8 100644
--- a/tests/orchestrator/context/test_operation.py
+++ b/tests/orchestrator/context/test_operation.py
@@ -263,7 +263,7 @@ def test_plugin_workdir(ctx, thread_executor, tmpdir):
 
 
 @pytest.fixture(params=[
-    (thread.ThreadExecutor, {}),
+    # (thread.ThreadExecutor, {}),
     (process.ProcessExecutor, {'python_path': [tests.ROOT_DIR]}),
 ])
 def executor(request):
@@ -343,6 +343,66 @@ def test_relationship_operation_logging(ctx, executor):
     _assert_loggins(ctx, inputs)
 
 
+def test_attribute_consumption(ctx, executor, dataholder):
+    # region Updating node operation
+    node_int_name, node_op_name = mock.operations.NODE_OPERATIONS_INSTALL[0]   
 # Standard.install
+
+    source_node = ctx.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME)
+
+    inputs = {'attributes_dict': {'key': 'value'}}
+    interface = mock.models.create_interface(
+        source_node.service,
+        node_int_name,
+        node_op_name,
+        operation_kwargs=dict(
+            implementation=op_path(attribute_altering_operation, 
module_path=__name__),
+            inputs=inputs)
+    )
+    source_node.interfaces[interface.name] = interface
+    ctx.model.node.update(source_node)
+    # endregion
+
+    # region updating relationship operation
+    rel_int_name, rel_op_name = 
mock.operations.RELATIONSHIP_OPERATIONS_INSTALL[2]  # Configure.add_source
+
+    relationship = ctx.model.relationship.list()[0]
+    interface = mock.models.create_interface(
+        relationship.source_node.service,
+        rel_int_name,
+        rel_op_name,
+        operation_kwargs=dict(
+            implementation=op_path(attribute_consuming_operation, 
module_path=__name__),
+            inputs={'holder_path': dataholder.path}
+        )
+    )
+    relationship.interfaces[interface.name] = interface
+    ctx.model.relationship.update(relationship)
+    # endregion
+
+    @workflow
+    def basic_workflow(graph, **_):
+        graph.sequence(
+            api.task.OperationTask(
+                source_node,
+                interface_name=node_int_name,
+                operation_name=node_op_name,
+                inputs=inputs
+            ),
+            api.task.OperationTask(
+                relationship,
+                interface_name=rel_int_name,
+                operation_name=rel_op_name,
+            )
+        )
+
+    execute(workflow_func=basic_workflow, workflow_context=ctx, 
executor=executor)
+    target_node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+
+    assert len(source_node.attributes) == len(target_node.attributes) == 1
+    assert source_node.attributes['key'] != target_node.attributes['key']
+    assert source_node.attributes['key'].value == 
target_node.attributes['key'].value == dataholder['key']
+
+
 def _assert_loggins(ctx, inputs):
 
     # The logs should contain the following: Workflow Start, Operation Start, 
custom operation
@@ -422,3 +482,15 @@ def get_node_id(ctx, holder_path, **_):
 def _test_plugin_workdir(ctx, filename, content):
     with open(os.path.join(ctx.plugin_workdir, filename), 'w') as f:
         f.write(content)
+
+
+@operation
+def attribute_altering_operation(ctx, attributes_dict, **_):
+    ctx.node.attributes.update(attributes_dict)
+
+
+@operation
+def attribute_consuming_operation(ctx, holder_path, **_):
+    holder = helpers.FilesystemDataHolder(holder_path)
+    ctx.target_node.attributes.update(ctx.source_node.attributes)
+    holder.update(ctx.source_node.attributes)

Reply via email to