Repository: incubator-ariatosca Updated Branches: refs/heads/runtime_props_to_attr [created] 87dad5511
wip Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/87dad551 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/87dad551 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/87dad551 Branch: refs/heads/runtime_props_to_attr Commit: 87dad55110c75f3853cc1da4cd0e3e82d58a1678 Parents: fdd57c4 Author: max-orlov <[email protected]> Authored: Sun May 14 22:38:39 2017 +0300 Committer: max-orlov <[email protected]> Committed: Sun May 14 22:38:39 2017 +0300 ---------------------------------------------------------------------- aria/modeling/service_common.py | 9 +- aria/orchestrator/context/operation.py | 68 ++++++++++++++ aria/orchestrator/workflows/core/engine.py | 1 - aria/orchestrator/workflows/executor/process.py | 7 +- aria/storage/instrumentation.py | 96 ++++++++++++++------ tests/helpers.py | 7 ++ tests/orchestrator/context/test_operation.py | 74 ++++++++++++++- 7 files changed, 224 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/87dad551/aria/modeling/service_common.py ---------------------------------------------------------------------- diff --git a/aria/modeling/service_common.py b/aria/modeling/service_common.py index e9c96a4..ef19c8e 100644 --- a/aria/modeling/service_common.py +++ b/aria/modeling/service_common.py @@ -218,14 +218,13 @@ class ParameterBase(TemplateModelMixin, caching.HasCachedMethods): :type description: basestring """ - from . import models type_name = canonical_type_name(value) if type_name is None: type_name = full_type_name(value) - return models.Parameter(name=name, # pylint: disable=unexpected-keyword-arg - type_name=type_name, - value=value, - description=description) + return cls(name=name, # pylint: disable=unexpected-keyword-arg + type_name=type_name, + value=value, + description=description) class TypeBase(InstanceModelMixin): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/87dad551/aria/orchestrator/context/operation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py index 68a02aa..2c3f173 100644 --- a/aria/orchestrator/context/operation.py +++ b/aria/orchestrator/context/operation.py @@ -24,6 +24,68 @@ from aria.utils import file from .common import BaseContext +class _DecorateAttributes(object): + + class _Attributes(object): + def __init__(self, model, actor): + self._model = model + self._actor = actor + self._attributes = actor.attributes + self._attr_cls = self._model.parameter.model_cls + + def __getitem__(self, item): + return self._attributes[item].value + + def __setitem__(self, key, value): + if key in self._attributes: + self._attributes[key].value = value + self._model.parameter.update(self._attributes[key]) + else: + attr = self._attr_cls.wrap(key, value) + self._attributes[key] = attr + self._model.parameter.put(attr) + + def update(self, dict_=None, **kwargs): + if dict_: + for key, value in dict_.items(): + self[key] = value + + for key, value in kwargs.items(): + self[key] = value + + def keys(self): + for attr in self._attributes.values(): + yield attr.unwrap()[0] + + def values(self): + for attr in self._attributes.values(): + yield attr.unwrap()[1] + + def items(self): + for attr in self._attributes.values(): + yield attr.unwrap() + + def __iter__(self): + for attr in self._attributes.values(): + yield attr.unwrap()[0] + + def __init__(self, func): + self._func = func + + def __getattr__(self, item): + try: + return getattr(self._actor, item) + except AttributeError: + return super(_DecorateAttributes, self).__getattribute__(item) + + def __call__(self, *args, **kwargs): + func_self = args[0] + actor = self._func(*args, **kwargs) + model = func_self.model + self.attributes = self._Attributes(model, actor) + return self + + class BaseOperationContext(BaseContext): """ Context object used during operation creation and execution @@ -105,6 +167,7 @@ class NodeOperationContext(BaseOperationContext): """ @property + @_DecorateAttributes def node_template(self): """ the node of the current operation @@ -113,6 +176,7 @@ class NodeOperationContext(BaseOperationContext): return self.node.node_template @property + @_DecorateAttributes def node(self): """ The node instance of the current operation @@ -127,6 +191,7 @@ class RelationshipOperationContext(BaseOperationContext): """ @property + @_DecorateAttributes def source_node_template(self): """ The source node @@ -135,6 +200,7 @@ class RelationshipOperationContext(BaseOperationContext): return self.source_node.node_template @property + @_DecorateAttributes def source_node(self): """ The source node instance @@ -143,6 +209,7 @@ class RelationshipOperationContext(BaseOperationContext): return self.relationship.source_node @property + @_DecorateAttributes def target_node_template(self): """ The target node @@ -151,6 +218,7 @@ class RelationshipOperationContext(BaseOperationContext): return self.target_node.node_template @property + @_DecorateAttributes def target_node(self): """ The target node instance http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/87dad551/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py index 561265c..3a96804 100644 --- a/aria/orchestrator/workflows/core/engine.py +++ b/aria/orchestrator/workflows/core/engine.py @@ -69,7 +69,6 @@ class Engine(logger.LoggerMixin): else: events.on_success_workflow_signal.send(self._workflow_context) except BaseException as e: - events.on_failure_workflow_signal.send(self._workflow_context, exception=e) raise http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/87dad551/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index 824c4e1..c3962ed 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -334,7 +334,8 @@ def _patch_ctx(ctx, messenger, instrument): original_refresh(target) def patched_commit(): - messenger.apply_tracked_changes(instrument.tracked_changes, instrument.new_instances) + messenger.apply_tracked_changes(instrument.tracked_changes, + instrument.new_instances_as_dict) instrument.expunge_session() instrument.clear() @@ -388,11 +389,11 @@ def _main(): task_func = decorate(task_func) task_func(ctx=ctx, **operation_inputs) messenger.succeeded(tracked_changes=instrument.tracked_changes, - new_instances=instrument.new_instances) + new_instances=instrument.new_instances_as_dict) except BaseException as e: messenger.failed(exception=e, tracked_changes=instrument.tracked_changes, - new_instances=instrument.new_instances) + new_instances=instrument.new_instances_as_dict) finally: instrument.expunge_session() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/87dad551/aria/storage/instrumentation.py ---------------------------------------------------------------------- diff --git a/aria/storage/instrumentation.py b/aria/storage/instrumentation.py index 390f933..701c058 100644 --- a/aria/storage/instrumentation.py +++ b/aria/storage/instrumentation.py @@ -25,13 +25,18 @@ from ..storage.exceptions import StorageError _VERSION_ID_COL = 'version' _STUB = object() +_Collection = type('_Collection', (object, ), {}) + +collection = _Collection() _INSTRUMENTED = { 'modified': { - _models.Node.runtime_properties: dict, _models.Node.state: str, _models.Task.status: str, + _models.Node.attributes: collection, + # TODO: add support for pickled type + # _models.Parameter._value: some_type }, - 'new': (_models.Log, ) + 'new': (_models.Log, ), } @@ -70,7 +75,7 @@ class _Instrumentation(object): def __init__(self, model, instrumented): self.tracked_changes = {} - self.new_instances = {} + self.new_instances_as_dict = {} self.listeners = [] self._instances_to_expunge = [] self._model = model @@ -93,25 +98,26 @@ class _Instrumentation(object): def _track_changes(self, instrumented): instrumented_attribute_classes = {} + # Track any newly created instances. + for instrumented_class in instrumented.get('new', []): + self._register_new_instance_listener(instrumented_class) + # Track any newly-set attributes. for instrumented_attribute, attribute_type in instrumented.get('modified', {}).items(): - self._register_set_attribute_listener( - instrumented_attribute=instrumented_attribute, - attribute_type=attribute_type) - instrumented_class = instrumented_attribute.parent.entity - instrumented_class_attributes = instrumented_attribute_classes.setdefault( - instrumented_class, {}) - instrumented_class_attributes[instrumented_attribute.key] = attribute_type + self._register_attribute_listener(instrumented_attribute=instrumented_attribute, + attribute_type=attribute_type) + # TODO: Revisit this, why not? + if not isinstance(attribute_type, _Collection): + instrumented_class = instrumented_attribute.parent.entity + instrumented_class_attributes = instrumented_attribute_classes.setdefault( + instrumented_class, {}) + instrumented_class_attributes[instrumented_attribute.key] = attribute_type # Track any global instance update such as 'refresh' or 'load' for instrumented_class, instrumented_attributes in instrumented_attribute_classes.items(): self._register_instance_listeners(instrumented_class=instrumented_class, instrumented_attributes=instrumented_attributes) - # Track any newly created instances. - for instrumented_class in instrumented.get('new', {}): - self._register_new_instance_listener(instrumented_class) - def _register_new_instance_listener(self, instrumented_class): if self._model is None: raise StorageError("In order to keep track of new instances, a ctx is needed") @@ -120,7 +126,7 @@ class _Instrumentation(object): if not isinstance(instance, instrumented_class): return self._instances_to_expunge.append(instance) - tracked_instances = self.new_instances.setdefault(instance.__modelname__, {}) + tracked_instances = self.new_instances_as_dict.setdefault(instance.__modelname__, {}) tracked_attributes = tracked_instances.setdefault(self._new_instance_id, {}) instance_as_dict = instance.to_dict() instance_as_dict.update((k, getattr(instance, k)) @@ -131,6 +137,28 @@ class _Instrumentation(object): sqlalchemy.event.listen(*listener_args) self.listeners.append(listener_args) + def _register_attribute_listener(self, instrumented_attribute, attribute_type): + # Track and newly created instances that are a part of a collection. + if isinstance(attribute_type, _Collection): + return self._register_append_to_attribute_listener(instrumented_attribute) + else: + return self._register_set_attribute_listener(instrumented_attribute, attribute_type) + + def _register_append_to_attribute_listener(self, collection_attr): + def listener(target, value, initiator): + tracked_instances = self.tracked_changes.setdefault(target.__modelname__, {}) + tracked_attributes = tracked_instances.setdefault(target.id, {}) + collection = tracked_attributes.setdefault(initiator.key, []) + instance_as_dict = value.to_dict() + instance_as_dict.update((k, getattr(value, k)) + for k in getattr(value, '__private_fields__', [])) + instance_as_dict['_MODEL_CLS'] = value.__modelname__ + collection.append(instance_as_dict) + + listener_args = (collection_attr, 'append', listener) + sqlalchemy.event.listen(*listener_args) + self.listeners.append(listener_args) + def _register_set_attribute_listener(self, instrumented_attribute, attribute_type): def listener(target, value, *_): mapi_name = target.__modelname__ @@ -179,7 +207,7 @@ class _Instrumentation(object): else: self.tracked_changes.clear() - self.new_instances.clear() + self.new_instances_as_dict.clear() self._instances_to_expunge = [] def restore(self): @@ -230,27 +258,39 @@ def apply_tracked_changes(tracked_changes, new_instances, model): for mapi_name, tracked_instances in tracked_changes.items(): successfully_updated_changes[mapi_name] = dict() mapi = getattr(model, mapi_name) + + # Handle new instances + for mapi_name, new_instance in new_instances.items(): + successfully_updated_changes[mapi_name] = dict() + mapi = getattr(model, mapi_name) + for tmp_id, new_instance_kwargs in new_instance.items(): + instance = mapi.model_cls(**new_instance_kwargs) + mapi.put(instance) + successfully_updated_changes[mapi_name][instance.id] = new_instance_kwargs + new_instance[tmp_id] = instance + for instance_id, tracked_attributes in tracked_instances.items(): successfully_updated_changes[mapi_name][instance_id] = dict() instance = None for attribute_name, value in tracked_attributes.items(): - if value.initial != value.current: - instance = instance or mapi.get(instance_id) + instance = instance or mapi.get(instance_id) + if isinstance(value, list): + # The changes are new item to a collection + for item in value: + model_name = item.pop('_MODEL_CLS') + attr_model = getattr(model, model_name).model_cls + new_attr = attr_model(**item) + getattr(instance, attribute_name)[new_attr] = new_attr + elif value.initial != value.current: + # scalar attribute setattr(instance, attribute_name, value.current) if instance: _validate_version_id(instance, mapi) mapi.update(instance) - successfully_updated_changes[mapi_name][instance_id] = [ - v.dict for v in tracked_attributes.values()] + # TODO: reinstate this + # successfully_updated_changes[mapi_name][instance_id] = [ + # v.dict for v in tracked_attributes.values()] - # Handle new instances - for mapi_name, new_instance in new_instances.items(): - successfully_updated_changes[mapi_name] = dict() - mapi = getattr(model, mapi_name) - for new_instance_kwargs in new_instance.values(): - instance = mapi.model_cls(**new_instance_kwargs) - mapi.put(instance) - successfully_updated_changes[mapi_name][instance.id] = new_instance_kwargs except BaseException: for key, value in successfully_updated_changes.items(): if not value: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/87dad551/tests/helpers.py ---------------------------------------------------------------------- diff --git a/tests/helpers.py b/tests/helpers.py index 3c3efc9..133e33b 100644 --- a/tests/helpers.py +++ b/tests/helpers.py @@ -67,6 +67,13 @@ class FilesystemDataHolder(object): self._dump(dict_) return return_value + def update(self, dict_=None, **kwargs): + current_dict = self._load() + if dict_: + current_dict.update(dict_) + current_dict.update(**kwargs) + self._dump(current_dict) + @property def path(self): return self._path http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/87dad551/tests/orchestrator/context/test_operation.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_operation.py b/tests/orchestrator/context/test_operation.py index cdeb5fa..14868e8 100644 --- a/tests/orchestrator/context/test_operation.py +++ b/tests/orchestrator/context/test_operation.py @@ -263,7 +263,7 @@ def test_plugin_workdir(ctx, thread_executor, tmpdir): @pytest.fixture(params=[ - (thread.ThreadExecutor, {}), + # (thread.ThreadExecutor, {}), (process.ProcessExecutor, {'python_path': [tests.ROOT_DIR]}), ]) def executor(request): @@ -343,6 +343,66 @@ def test_relationship_operation_logging(ctx, executor): _assert_loggins(ctx, inputs) +def test_attribute_consumption(ctx, executor, dataholder): + # region Updating node operation + node_int_name, node_op_name = mock.operations.NODE_OPERATIONS_INSTALL[0] # Standard.install + + source_node = ctx.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME) + + inputs = {'attributes_dict': {'key': 'value'}} + interface = mock.models.create_interface( + source_node.service, + node_int_name, + node_op_name, + operation_kwargs=dict( + implementation=op_path(attribute_altering_operation, module_path=__name__), + inputs=inputs) + ) + source_node.interfaces[interface.name] = interface + ctx.model.node.update(source_node) + # endregion + + # region updating relationship operation + rel_int_name, rel_op_name = mock.operations.RELATIONSHIP_OPERATIONS_INSTALL[2] # Configure.add_source + + relationship = ctx.model.relationship.list()[0] + interface = mock.models.create_interface( + relationship.source_node.service, + rel_int_name, + rel_op_name, + operation_kwargs=dict( + implementation=op_path(attribute_consuming_operation, module_path=__name__), + inputs={'holder_path': dataholder.path} + ) + ) + relationship.interfaces[interface.name] = interface + ctx.model.relationship.update(relationship) + # endregion + + @workflow + def basic_workflow(graph, **_): + graph.sequence( + api.task.OperationTask( + source_node, + interface_name=node_int_name, + operation_name=node_op_name, + inputs=inputs + ), + api.task.OperationTask( + relationship, + interface_name=rel_int_name, + operation_name=rel_op_name, + ) + ) + + execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor) + target_node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + + assert len(source_node.attributes) == len(target_node.attributes) == 1 + assert source_node.attributes['key'] != target_node.attributes['key'] + assert source_node.attributes['key'].value == target_node.attributes['key'].value == dataholder['key'] + + def _assert_loggins(ctx, inputs): # The logs should contain the following: Workflow Start, Operation Start, custom operation @@ -422,3 +482,15 @@ def get_node_id(ctx, holder_path, **_): def _test_plugin_workdir(ctx, filename, content): with open(os.path.join(ctx.plugin_workdir, filename), 'w') as f: f.write(content) + + +@operation +def attribute_altering_operation(ctx, attributes_dict, **_): + ctx.node.attributes.update(attributes_dict) + + +@operation +def attribute_consuming_operation(ctx, holder_path, **_): + holder = helpers.FilesystemDataHolder(holder_path) + ctx.target_node.attributes.update(ctx.source_node.attributes) + holder.update(ctx.source_node.attributes)
