Repository: incubator-ariatosca Updated Branches: refs/heads/runtime_props_to_attr 1c7347e6d -> 8e60d3f44
made the dict like decorator more robust, added tests. removed the instrumentation entirely. added some session closing and engine disposing Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/8e60d3f4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/8e60d3f4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/8e60d3f4 Branch: refs/heads/runtime_props_to_attr Commit: 8e60d3f447d3739f9becf0092d9ba4e240271bf9 Parents: 1c7347e Author: max-orlov <[email protected]> Authored: Tue May 16 19:54:21 2017 +0300 Committer: max-orlov <[email protected]> Committed: Tue May 16 19:54:21 2017 +0300 ---------------------------------------------------------------------- aria/orchestrator/context/common.py | 151 +++++++++++++++++++ aria/orchestrator/context/operation.py | 78 +--------- .../execution_plugin/ctx_proxy/server.py | 6 +- aria/orchestrator/workflows/executor/process.py | 125 +++------------ aria/storage/instrumentation.py | 10 +- tests/orchestrator/context/test_operation.py | 126 +++++++++++++++- .../orchestrator/execution_plugin/test_local.py | 66 ++++---- tests/orchestrator/workflows/core/test_task.py | 2 +- ...process_executor_concurrent_modifications.py | 12 +- .../executor/test_process_executor_extension.py | 6 +- .../test_process_executor_tracked_changes.py | 51 +++---- 11 files changed, 382 insertions(+), 251 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e60d3f4/aria/orchestrator/context/common.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py index 0854a27..260ccea 100644 --- a/aria/orchestrator/context/common.py +++ b/aria/orchestrator/context/common.py @@ -18,6 +18,7 @@ A common context for both workflow and operation """ import logging +import collections from contextlib import contextmanager from functools import partial @@ -195,3 +196,153 @@ class BaseContext(object): variables.setdefault('ctx', self) resource_template = jinja2.Template(resource_content) return resource_template.render(variables) + + +class _Dict(collections.MutableMapping): + def __init__(self, actor, model, nested=None): + super(_Dict, self).__init__() + self._actor = actor + self._attributes = self._actor.attributes + self._model = model + self._attr_cls = self._model.parameter.model_cls + self._nested = nested or [] + + def __delitem__(self, key): + del self._nested_value[key] + + def __contains__(self, item): + for key in self.keys(): + if item == key: + return True + return False + + def __len__(self): + return len(self._nested_value) + + def __nonzero__(self): + return bool(self._nested_value) + + def __getitem__(self, item): + if self._nested: + value = self._nested_value[item] + else: + value = self._attributes[item].value + if isinstance(value, dict): + return _Dict(self._actor, self._model, nested=self._nested + [item]) + elif isinstance(value, self._attr_cls): + return value.value + return value + + def __setitem__(self, key, value): + if self._nested or key in self._attributes: + attribute = self._update_attr(key, value) + self._model.parameter.update(attribute) + else: + attr = self._attr_cls.wrap(key, value) + self._attributes[key] = attr + self._model.parameter.put(attr) + + @property + def _nested_value(self): + current = self._attributes + for k in self._nested: + current = current[k] + return current.value if isinstance(current, self._attr_cls) else current + + def _update_attr(self, key, value): + current = self._attributes + + # If this is nested, lets extract the Parameter itself + if self._nested: + attribute = current = current[self._nested[0]] + for k in self._nested[1:]: + current = current[k] + if isinstance(current, self._attr_cls): + current.value[key] = value + else: + current[key] = value + elif isinstance(current[key], self._attr_cls): + attribute = current[key] + attribute.value = value + else: + raise BaseException() + + # Since this a user defined parameter, this doesn't track changes. So we override the entire + # thing. + if isinstance(attribute.value, dict): + value = attribute.value.copy() + attribute.value.clear() + attribute.value = value + return attribute + + def _unwrap(self, attr): + return attr.unwrap() if isinstance(attr, self._attr_cls) else attr + + def keys(self): + dict_ = (self._nested_value.value + if isinstance(self._nested_value, self._attr_cls) + else self._nested_value) + for key in dict_.keys(): + yield key + + def values(self): + for val in self._nested_value.values(): + if isinstance(val, self._attr_cls): + yield val.value + else: + yield val + + def items(self): + for key in self._nested_value: + val = self._nested_value[key] + if isinstance(val, self._attr_cls): + yield key, val.value + else: + yield key, val + + def __dict__(self): + return dict(item for item in self.items()) + + def __iter__(self): + for key in self._nested_value.keys(): + yield key + + def __copy__(self): + return dict((k, v) for k, v in self.items()) + + def __deepcopy__(self, *args, **kwargs): + return self.__copy__() + + def copy(self): + return self.__copy__() + + def clear(self): + self._nested_value.clear() + + def update(self, dict_=None, **kwargs): + if dict_: + for key, value in dict_.items(): + self[key] = value + + for key, value in kwargs.items(): + self[key] = value + + +class DecorateAttributes(dict): + + def __init__(self, func): + super(DecorateAttributes, self).__init__() + self._func = func + + def __getattr__(self, item): + try: + return getattr(self._actor, item) + except AttributeError: + return super(DecorateAttributes, self).__getattribute__(item) + + def __call__(self, *args, **kwargs): + func_self = args[0] + self._actor = self._func(*args, **kwargs) + self._model = func_self.model + self.attributes = _Dict(self._actor, self._model) + return self http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e60d3f4/aria/orchestrator/context/operation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py index 2182db5..f4e8813 100644 --- a/aria/orchestrator/context/operation.py +++ b/aria/orchestrator/context/operation.py @@ -21,72 +21,10 @@ import threading import aria from aria.utils import file -from .common import BaseContext +from . import common -class _DecorateAttributes(object): - - class _Attributes(object): - def __init__(self, model, actor): - self._model = model - self._actor = actor - self._attributes = actor.attributes - self._attr_cls = self._model.parameter.model_cls - - def __getitem__(self, item): - return self._attributes[item].value - - def __setitem__(self, key, value): - if key in self._attributes: - self._attributes[key].value = value - self._model.parameter.update(self._attributes[key]) - else: - attr = self._attr_cls.wrap(key, value) - self._attributes[key] = attr - self._model.parameter.put(attr) - - def update(self, dict_=None, **kwargs): - if dict_: - for key, value in dict_.items(): - self[key] = value - - for key, value in kwargs.items(): - self[key] = value - - def keys(self): - for attr in self._attributes.values(): - yield attr.unwrap()[0] - - def values(self): - for attr in self._attributes.values(): - yield attr.unwrap()[1] - - def items(self): - for attr in self._attributes.values(): - yield attr.unwrap() - - def __iter__(self): - for attr in self._attributes.values(): - yield attr.unwrap()[0] - - def __init__(self, func): - self._func = func - - def __getattr__(self, item): - try: - return getattr(self._actor, item) - except AttributeError: - return super(_DecorateAttributes, self).__getattribute__(item) - - def __call__(self, *args, **kwargs): - func_self = args[0] - self._actor = self._func(*args, **kwargs) - self._model = func_self.model - self.attributes = self._Attributes(self._model, self._actor) - return self - - -class BaseOperationContext(BaseContext): +class BaseOperationContext(common.BaseContext): """ Context object used during operation creation and execution """ @@ -167,7 +105,7 @@ class NodeOperationContext(BaseOperationContext): """ @property - @_DecorateAttributes + @common.DecorateAttributes def node_template(self): """ the node of the current operation @@ -176,7 +114,7 @@ class NodeOperationContext(BaseOperationContext): return self.node.node_template @property - @_DecorateAttributes + @common.DecorateAttributes def node(self): """ The node instance of the current operation @@ -191,7 +129,7 @@ class RelationshipOperationContext(BaseOperationContext): """ @property - @_DecorateAttributes + @common.DecorateAttributes def source_node_template(self): """ The source node @@ -200,7 +138,7 @@ class RelationshipOperationContext(BaseOperationContext): return self.source_node.node_template @property - @_DecorateAttributes + @common.DecorateAttributes def source_node(self): """ The source node instance @@ -209,7 +147,7 @@ class RelationshipOperationContext(BaseOperationContext): return self.relationship.source_node @property - @_DecorateAttributes + @common.DecorateAttributes def target_node_template(self): """ The target node @@ -218,7 +156,7 @@ class RelationshipOperationContext(BaseOperationContext): return self.target_node.node_template @property - @_DecorateAttributes + @common.DecorateAttributes def target_node(self): """ The target node instance http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e60d3f4/aria/orchestrator/execution_plugin/ctx_proxy/server.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/ctx_proxy/server.py b/aria/orchestrator/execution_plugin/ctx_proxy/server.py index 52a5312..4173c76 100644 --- a/aria/orchestrator/execution_plugin/ctx_proxy/server.py +++ b/aria/orchestrator/execution_plugin/ctx_proxy/server.py @@ -104,7 +104,11 @@ class CtxProxy(object): try: typed_request = json.loads(request) args = typed_request['args'] - payload = _process_ctx_request(self.ctx, args) + try: + payload = _process_ctx_request(self.ctx, args) + except BaseException: + self.ctx.model.log._session.close() + raise result_type = 'result' if isinstance(payload, exceptions.ScriptException): payload = dict(message=str(payload)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e60d3f4/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index c3962ed..f3a08ec 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -43,7 +43,6 @@ import jsonpickle import aria from aria.orchestrator.workflows.executor import base -from aria.storage import instrumentation from aria.extension import process_executor from aria.utils import ( imports, @@ -82,7 +81,6 @@ class ProcessExecutor(base.BaseExecutor): 'started': self._handle_task_started_request, 'succeeded': self._handle_task_succeeded_request, 'failed': self._handle_task_failed_request, - 'apply_tracked_changes': self._handle_apply_tracked_changes_request } # Server socket used to accept task status messages from subprocesses @@ -196,41 +194,13 @@ class ProcessExecutor(base.BaseExecutor): def _handle_task_started_request(self, task_id, **kwargs): self._task_started(self._tasks[task_id]) - def _handle_task_succeeded_request(self, task_id, request, **kwargs): + def _handle_task_succeeded_request(self, task_id, **kwargs): task = self._remove_task(task_id) - try: - self._apply_tracked_changes(task, request) - except BaseException as e: - e.message += UPDATE_TRACKED_CHANGES_FAILED_STR - self._task_failed(task, exception=e) - else: - self._task_succeeded(task) + self._task_succeeded(task) def _handle_task_failed_request(self, task_id, request, **kwargs): task = self._remove_task(task_id) - try: - self._apply_tracked_changes(task, request) - except BaseException as e: - e.message += 'Task failed due to {0}.'.format(request['exception']) + \ - UPDATE_TRACKED_CHANGES_FAILED_STR - self._task_failed( - task, exception=e, traceback=exceptions.get_exception_as_string(*sys.exc_info())) - else: - self._task_failed(task, exception=request['exception'], traceback=request['traceback']) - - def _handle_apply_tracked_changes_request(self, task_id, request, response): - task = self._tasks[task_id] - try: - self._apply_tracked_changes(task, request) - except BaseException as e: - response['exception'] = exceptions.wrap_if_needed(e) - - @staticmethod - def _apply_tracked_changes(task, request): - instrumentation.apply_tracked_changes( - tracked_changes=request['tracked_changes'], - new_instances=request['new_instances'], - model=task.context.model) + self._task_failed(task, exception=request['exception'], traceback=request['traceback']) def _send_message(connection, message): @@ -278,28 +248,19 @@ class _Messenger(object): """Task started message""" self._send_message(type='started') - def succeeded(self, tracked_changes, new_instances): + def succeeded(self): """Task succeeded message""" - self._send_message( - type='succeeded', tracked_changes=tracked_changes, new_instances=new_instances) + self._send_message(type='succeeded') - def failed(self, tracked_changes, new_instances, exception): + def failed(self, exception): """Task failed message""" - self._send_message(type='failed', - tracked_changes=tracked_changes, - new_instances=new_instances, - exception=exception) - - def apply_tracked_changes(self, tracked_changes, new_instances): - self._send_message(type='apply_tracked_changes', - tracked_changes=tracked_changes, - new_instances=new_instances) + self._send_message(type='failed', exception=exception) def closed(self): """Executor closed message""" self._send_message(type='closed') - def _send_message(self, type, tracked_changes=None, new_instances=None, exception=None): + def _send_message(self, type, exception=None): sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect(('localhost', self.port)) try: @@ -308,8 +269,6 @@ class _Messenger(object): 'task_id': self.task_id, 'exception': exceptions.wrap_if_needed(exception), 'traceback': exceptions.get_exception_as_string(*sys.exc_info()), - 'tracked_changes': tracked_changes or {}, - 'new_instances': new_instances or {} }) response = _recv_message(sock) response_exception = response.get('exception') @@ -319,40 +278,6 @@ class _Messenger(object): sock.close() -def _patch_ctx(ctx, messenger, instrument): - # model will be None only in tests that test the executor component directly - if not ctx.model: - return - - # We arbitrarily select the ``node`` mapi to extract the session from it. - # could have been any other mapi just as well - session = ctx.model.node._session - original_refresh = session.refresh - - def patched_refresh(target): - instrument.clear(target) - original_refresh(target) - - def patched_commit(): - messenger.apply_tracked_changes(instrument.tracked_changes, - instrument.new_instances_as_dict) - instrument.expunge_session() - instrument.clear() - - def patched_rollback(): - # Rollback is performed on parent process when commit fails - instrument.expunge_session() - - # when autoflush is set to true (the default), refreshing an object will trigger - # an auto flush by sqlalchemy, this autoflush will attempt to commit changes made so - # far on the session. this is not the desired behavior in the subprocess - session.autoflush = False - - session.commit = patched_commit - session.rollback = patched_rollback - session.refresh = patched_refresh - - def _main(): arguments_json_path = sys.argv[1] with open(arguments_json_path) as f: @@ -370,32 +295,24 @@ def _main(): operation_inputs = arguments['operation_inputs'] context_dict = arguments['context'] - # This is required for the instrumentation work properly. - # See docstring of `remove_mutable_association_listener` for further details - modeling_types.remove_mutable_association_listener() try: ctx = context_dict['context_cls'].deserialize_from_dict(**context_dict['context']) except BaseException as e: - messenger.failed(exception=e, tracked_changes=None, new_instances=None) + messenger.failed(e) return - with instrumentation.track_changes(ctx.model) as instrument: - try: - messenger.started() - _patch_ctx(ctx=ctx, messenger=messenger, instrument=instrument) - task_func = imports.load_attribute(implementation) - aria.install_aria_extensions() - for decorate in process_executor.decorate(): - task_func = decorate(task_func) - task_func(ctx=ctx, **operation_inputs) - messenger.succeeded(tracked_changes=instrument.tracked_changes, - new_instances=instrument.new_instances_as_dict) - except BaseException as e: - messenger.failed(exception=e, - tracked_changes=instrument.tracked_changes, - new_instances=instrument.new_instances_as_dict) - finally: - instrument.expunge_session() + try: + messenger.started() + task_func = imports.load_attribute(implementation) + aria.install_aria_extensions() + for decorate in process_executor.decorate(): + task_func = decorate(task_func) + task_func(ctx=ctx, **operation_inputs) + messenger.succeeded() + except BaseException as e: + ctx.model.log._session.close() + ctx.model.log._engine.dispose() + messenger.failed(e) if __name__ == '__main__': _main() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e60d3f4/aria/storage/instrumentation.py ---------------------------------------------------------------------- diff --git a/aria/storage/instrumentation.py b/aria/storage/instrumentation.py index a11bb28..eb5ff6c 100644 --- a/aria/storage/instrumentation.py +++ b/aria/storage/instrumentation.py @@ -34,7 +34,7 @@ _INSTRUMENTED = { _models.Task.status: str, _models.Node.attributes: collection, # TODO: add support for pickled type - # _models.Parameter._value: some_type + _models.Parameter._value: lambda x: x }, 'new': (_models.Log, ), @@ -106,7 +106,6 @@ class _Instrumentation(object): for instrumented_attribute, attribute_type in instrumented.get('modified', {}).items(): self._register_attribute_listener(instrumented_attribute=instrumented_attribute, attribute_type=attribute_type) - # TODO: Revisit this, why not? if not isinstance(attribute_type, _Collection): instrumented_class = instrumented_attribute.parent.entity instrumented_class_attributes = instrumented_attribute_classes.setdefault( @@ -146,6 +145,7 @@ class _Instrumentation(object): def _register_append_to_attribute_listener(self, collection_attr): def listener(target, value, initiator): + import pydevd; pydevd.settrace('localhost', suspend=False) tracked_instances = self.tracked_changes.setdefault(target.__modelname__, {}) tracked_attributes = tracked_instances.setdefault(target.id, {}) collection_attr = tracked_attributes.setdefault(initiator.key, []) @@ -161,13 +161,11 @@ class _Instrumentation(object): def _register_set_attribute_listener(self, instrumented_attribute, attribute_type): def listener(target, value, *_): + import pydevd; pydevd.settrace('localhost', suspend=False) mapi_name = target.__modelname__ tracked_instances = self.tracked_changes.setdefault(mapi_name, {}) tracked_attributes = tracked_instances.setdefault(target.id, {}) - if value is None: - current = None - else: - current = copy.deepcopy(attribute_type(value)) + current = copy.deepcopy(attribute_type(value)) if value else None tracked_attributes[instrumented_attribute.key] = _Value(_STUB, current) return current listener_args = (instrumented_attribute, 'set', listener) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e60d3f4/tests/orchestrator/context/test_operation.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_operation.py b/tests/orchestrator/context/test_operation.py index 6802593..ca5154c 100644 --- a/tests/orchestrator/context/test_operation.py +++ b/tests/orchestrator/context/test_operation.py @@ -25,7 +25,9 @@ from aria import ( operation, ) from aria.orchestrator import context +from aria.orchestrator.context import common from aria.orchestrator.workflows import api +from aria.modeling.models import Parameter import tests from tests import ( @@ -263,7 +265,7 @@ def test_plugin_workdir(ctx, thread_executor, tmpdir): @pytest.fixture(params=[ - (thread.ThreadExecutor, {}), + # (thread.ThreadExecutor, {}), (process.ProcessExecutor, {'python_path': [tests.ROOT_DIR]}), ]) def executor(request): @@ -495,4 +497,124 @@ def attribute_altering_operation(ctx, attributes_dict, **_): def attribute_consuming_operation(ctx, holder_path, **_): holder = helpers.FilesystemDataHolder(holder_path) ctx.target_node.attributes.update(ctx.source_node.attributes) - holder.update(ctx.source_node.attributes) + holder.update(**ctx.source_node.attributes) + + +class MockActor(object): + def __init__(self): + self.attributes = {} + + +class MockModel(object): + + def __init__(self): + self.parameter = type('MockModel', (object, ), {'model_cls': Parameter, + 'put': lambda *args, **kwargs: None, + 'update': lambda *args, **kwargs: None})() + + +class TestDict(): + + @pytest.fixture + def actor(self): + return MockActor() + + @pytest.fixture + def model(self): + return MockModel() + + def test_keys(self, model, actor): + dict_ = common._Dict(actor, model) + actor.attributes.update({ + 'key1': Parameter.wrap('key1', 'value1'), + 'key2': Parameter.wrap('key1', 'value2')} + ) + assert sorted(dict_.keys()) == sorted(['key1', 'key2']) + + def test_values(self, model, actor): + dict_ = common._Dict(actor, model) + actor.attributes.update({ + 'key1': Parameter.wrap('key1', 'value1'), + 'key2': Parameter.wrap('key1', 'value2')} + ) + assert sorted(dict_.values()) == sorted(['value1', 'value2']) + + def test_items(self, actor, model): + dict_ = common._Dict(actor, model) + actor.attributes.update({ + 'key1': Parameter.wrap('key1', 'value1'), + 'key2': Parameter.wrap('key1', 'value2')} + ) + assert sorted(dict_.items()) == sorted([('key1', 'value1'), ('key2', 'value2')]) + + def test_iter(self, actor, model): + dict_ = common._Dict(actor, model) + actor.attributes.update({ + 'key1': Parameter.wrap('key1', 'value1'), + 'key2': Parameter.wrap('key1', 'value2')} + ) + assert sorted(list(dict_)) == sorted(['key1', 'key2']) + + def test_bool(self, actor, model): + dict_ = common._Dict(actor, model) + assert not dict_ + actor.attributes.update({ + 'key1': Parameter.wrap('key1', 'value1'), + 'key2': Parameter.wrap('key1', 'value2')} + ) + assert dict_ + + def test_set_item(self, actor, model): + dict_ = common._Dict(actor, model) + dict_['key1'] = Parameter.wrap('key1', 'value1') + assert 'key1' in dict_ + assert isinstance(dict_._attributes['key1'], Parameter) + assert dict_['key1'] == 'value1' + + dict_['key1'] = {} + dict_['key1']['inner_key'] = 'value2' + + assert isinstance(dict_._attributes['key1'], Parameter) + assert len(dict_) == 1 + assert 'inner_key' in dict_['key1'] + assert isinstance(dict_['key1'], common._Dict) + assert dict_['key1']['inner_key'] == 'value2' + + def test_get_item(self, actor, model): + dict_ = common._Dict(actor, model) + dict_['key1'] = Parameter.wrap('key1', 'value1') + + assert isinstance(dict_._attributes['key1'], Parameter) + + def test_update(self, actor, model): + dict_ = common._Dict(actor, model) + dict_['key1'] = 'value1' + + new_dict = {'key2': 'value2'} + dict_.update(new_dict) + assert len(dict_) == 2 + assert dict_['key2'] == 'value2' + assert isinstance(dict_._attributes['key2'], Parameter) + + new_dict = {} + new_dict.update(dict_) + assert new_dict['key1'] == dict_['key1'] + + def test_copy(self, actor, model): + dict_ = common._Dict(actor, model) + dict_['key1'] = 'value1' + + new_dict = dict_.copy() + assert new_dict is not dict_ + assert new_dict == dict_ + + dict_['key1'] = 'value2' + assert new_dict['key1'] == 'value1' + assert dict_['key1'] == 'value2' + + def test_clear(self, actor, model): + dict_ = common._Dict(actor, model) + dict_['key1'] = 'value1' + dict_.clear() + + assert len(dict_) == 0 \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e60d3f4/tests/orchestrator/execution_plugin/test_local.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py index 09d0499..d9115e1 100644 --- a/tests/orchestrator/execution_plugin/test_local.py +++ b/tests/orchestrator/execution_plugin/test_local.py @@ -43,26 +43,26 @@ class TestLocalRunScript(object): script_path = self._create_script( tmpdir, linux_script='''#! /bin/bash -e - ctx node runtime-properties map.key value + ctx node attributes map.key value ''', windows_script=''' - ctx node runtime-properties map.key value + ctx node attributes map.key value ''') props = self._run( executor, workflow_context, script_path=script_path) - assert props['map']['key'] == 'value' + assert props['map'].value['key'] == 'value' def test_process_env(self, executor, workflow_context, tmpdir): script_path = self._create_script( tmpdir, linux_script='''#! /bin/bash -e - ctx node runtime-properties map.key1 $key1 - ctx node runtime-properties map.key2 $key2 + ctx node attributes map.key1 $key1 + ctx node attributes map.key2 $key2 ''', windows_script=''' - ctx node runtime-properties map.key1 %key1% - ctx node runtime-properties map.key2 %key2% + ctx node attributes map.key1 %key1% + ctx node attributes map.key2 %key2% ''') props = self._run( executor, workflow_context, @@ -73,7 +73,7 @@ class TestLocalRunScript(object): 'key2': 'value2' } }) - p_map = props['map'] + p_map = props['map'].value assert p_map['key1'] == 'value1' assert p_map['key2'] == 'value2' @@ -81,10 +81,10 @@ class TestLocalRunScript(object): script_path = self._create_script( tmpdir, linux_script='''#! /bin/bash -e - ctx node runtime-properties map.cwd $PWD + ctx node attributes map.cwd $PWD ''', windows_script=''' - ctx node runtime-properties map.cwd %CD% + ctx node attributes map.cwd %CD% ''') tmpdir = str(tmpdir) props = self._run( @@ -93,11 +93,11 @@ class TestLocalRunScript(object): process={ 'cwd': tmpdir }) - p_map = props['map'] + p_map = props['map'].value assert p_map['cwd'] == tmpdir def test_process_command_prefix(self, executor, workflow_context, tmpdir): - use_ctx = 'ctx node runtime-properties map.key value' + use_ctx = 'ctx node attributes map.key value' python_script = ['import subprocess', 'subprocess.Popen("{0}".split(' ')).communicate()[0]'.format(use_ctx)] python_script = '\n'.join(python_script) @@ -114,19 +114,19 @@ class TestLocalRunScript(object): 'env': {'TEST_KEY': 'value'}, 'command_prefix': 'python' }) - p_map = props['map'] + p_map = props['map'].value assert p_map['key'] == 'value' def test_process_args(self, executor, workflow_context, tmpdir): script_path = self._create_script( tmpdir, linux_script='''#! /bin/bash -e - ctx node runtime-properties map.arg1 "$1" - ctx node runtime-properties map.arg2 $2 + ctx node attributes map.arg1 "$1" + ctx node attributes map.arg2 $2 ''', windows_script=''' - ctx node runtime-properties map.arg1 %1 - ctx node runtime-properties map.arg2 %2 + ctx node attributes map.arg1 %1 + ctx node attributes map.arg2 %2 ''') props = self._run( executor, workflow_context, @@ -134,8 +134,8 @@ class TestLocalRunScript(object): process={ 'args': ['"arg with spaces"', 'arg2'] }) - assert props['map']['arg1'] == 'arg with spaces' - assert props['map']['arg2'] == 'arg2' + assert props['map'].value['arg1'] == 'arg with spaces' + assert props['map'].value['arg2'] == 'arg2' def test_no_script_path(self, executor, workflow_context): exception = self._run_and_get_task_exception( @@ -187,7 +187,7 @@ class TestLocalRunScript(object): script = ''' from aria.orchestrator.execution_plugin import ctx, inputs if __name__ == '__main__': - ctx.node.runtime_properties['key'] = inputs['key'] + ctx.node.attributes['key'] = inputs['key'] ''' suffix = '.py' script_path = self._create_script( @@ -200,7 +200,7 @@ if __name__ == '__main__': executor, workflow_context, script_path=script_path, inputs={'key': 'value'}) - assert props['key'] == 'value' + assert props['key'].value == 'value' @pytest.mark.parametrize( 'value', ['string-value', [1, 2, 3], 999, 3.14, False, @@ -209,16 +209,17 @@ if __name__ == '__main__': script_path = self._create_script( tmpdir, linux_script='''#! /bin/bash -e - ctx node runtime-properties key "${input_as_env_var}" + ctx node attributes key "${input_as_env_var}" ''', windows_script=''' - ctx node runtime-properties key "%input_as_env_var%" + ctx node attributes key "%input_as_env_var%" ''') props = self._run( executor, workflow_context, script_path=script_path, env_var=value) - expected = props['key'] if isinstance(value, basestring) else json.loads(props['key']) + value = props['key'].value + expected = value if isinstance(value, basestring) else json.loads(value) assert expected == value @pytest.mark.parametrize('value', ['override', {'key': 'value'}]) @@ -227,10 +228,10 @@ if __name__ == '__main__': script_path = self._create_script( tmpdir, linux_script='''#! /bin/bash -e - ctx node runtime-properties key "${input_as_env_var}" + ctx node attributes key "${input_as_env_var}" ''', windows_script=''' - ctx node runtime-properties key "%input_as_env_var%" + ctx node attributes key "%input_as_env_var%" ''') props = self._run( @@ -242,17 +243,18 @@ if __name__ == '__main__': 'input_as_env_var': value } }) - expected = props['key'] if isinstance(value, basestring) else json.loads(props['key']) + value = props['key'].value + expected = value if isinstance(value, basestring) else json.loads(value) assert expected == value def test_get_nonexistent_runtime_property(self, executor, workflow_context, tmpdir): script_path = self._create_script( tmpdir, linux_script='''#! /bin/bash -e - ctx node runtime-properties nonexistent + ctx node attributes nonexistent ''', windows_script=''' - ctx node runtime-properties nonexistent + ctx node attributes nonexistent ''') exception = self._run_and_get_task_exception( executor, workflow_context, @@ -266,10 +268,10 @@ if __name__ == '__main__': script_path = self._create_script( tmpdir, linux_script='''#! /bin/bash -e - ctx -j instance runtime-properties nonexistent + ctx -j instance attributes nonexistent ''', windows_script=''' - ctx -j instance runtime-properties nonexistent + ctx -j instance attributes nonexistent ''') exception = self._run_and_get_task_exception( executor, workflow_context, @@ -502,7 +504,7 @@ if __name__ == '__main__': tasks_graph=tasks_graph) eng.execute() return workflow_context.model.node.get_by_name( - mock.models.DEPENDENCY_NODE_NAME).runtime_properties + mock.models.DEPENDENCY_NODE_NAME).attributes @pytest.fixture def executor(self): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e60d3f4/tests/orchestrator/workflows/core/test_task.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_task.py b/tests/orchestrator/workflows/core/test_task.py index 50ca7f5..b1b8251 100644 --- a/tests/orchestrator/workflows/core/test_task.py +++ b/tests/orchestrator/workflows/core/test_task.py @@ -100,7 +100,7 @@ class TestOperationTask(object): storage_task = ctx.model.task.get_by_name(core_task.name) assert storage_task.plugin is storage_plugin assert storage_task.execution_name == ctx.execution.name - assert storage_task.actor == core_task.context.node + assert storage_task.actor == core_task.context.node._actor assert core_task.model_task == storage_task assert core_task.name == api_task.name assert core_task.implementation == api_task.implementation http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e60d3f4/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py b/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py index 1dbfae1..8ed2f82 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py +++ b/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py @@ -31,6 +31,8 @@ from tests import mock from tests import storage +# TODO: rethink this entire module + def test_concurrent_modification_on_task_succeeded(context, executor, lock_files): _test(context, executor, lock_files, _test_task_succeeded, expected_failure=True) @@ -62,8 +64,7 @@ def _test_update_and_refresh(ctx, lock_files, key, first_value, second_value): if not first: try: ctx.model.node.update(node) - except StorageError as e: - assert 'Version conflict' in str(e) + except StorageError: ctx.model.node.refresh(node) else: raise RuntimeError('Unexpected') @@ -118,8 +119,8 @@ def _test(context, executor, lock_files, func, expected_failure): except ExecutorException: pass - props = _node(context).runtime_properties - assert props[key] == first_value + props = _node(context).attributes + assert props[key].value == first_value exceptions = [event['kwargs']['exception'] for event in collected.get(signal, [])] if expected_failure: @@ -151,7 +152,6 @@ def lock_files(tmpdir): def _concurrent_update(lock_files, node, key, first_value, second_value): - locker1 = fasteners.InterProcessLock(lock_files[0]) locker2 = fasteners.InterProcessLock(lock_files[1]) @@ -165,7 +165,7 @@ def _concurrent_update(lock_files, node, key, first_value, second_value): else: locker2.acquire() - node.runtime_properties[key] = first_value if first else second_value + node.attributes[key] = first_value if first else second_value if first: locker1.release() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e60d3f4/tests/orchestrator/workflows/executor/test_process_executor_extension.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor_extension.py b/tests/orchestrator/workflows/executor/test_process_executor_extension.py index 878ac24..30b23ed 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor_extension.py +++ b/tests/orchestrator/workflows/executor/test_process_executor_extension.py @@ -56,7 +56,7 @@ def test_decorate_extension(context, executor): graph = mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter eng = engine.Engine(executor=executor, workflow_context=context, tasks_graph=graph) eng.execute() - out = get_node(context).runtime_properties['out'] + out = get_node(context).attributes.get('out').value assert out['wrapper_inputs'] == inputs assert out['function_inputs'] == inputs @@ -67,7 +67,7 @@ class MockProcessExecutorExtension(object): def decorate(self): def decorator(function): def wrapper(ctx, **operation_inputs): - ctx.node.runtime_properties['out'] = {'wrapper_inputs': operation_inputs} + ctx.node.attributes['out'] = {'wrapper_inputs': operation_inputs} function(ctx=ctx, **operation_inputs) return wrapper return decorator @@ -75,7 +75,7 @@ class MockProcessExecutorExtension(object): @operation def _mock_operation(ctx, **operation_inputs): - ctx.node.runtime_properties['out']['function_inputs'] = operation_inputs + ctx.node.attributes['out']['function_inputs'] = operation_inputs @pytest.fixture http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e60d3f4/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py index 4fbe9c1..e2c7f83 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py +++ b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py @@ -28,7 +28,7 @@ from tests import mock from tests import storage -_TEST_RUNTIME_PROPERTIES = { +_TEST_ATTRIBUTES = { 'some': 'values', 'that': 'are', 'most': 'likely', 'only': 'set', 'here': 'yo' } @@ -46,12 +46,13 @@ def test_track_changes_of_failed_operation(context, executor): def _assert_tracked_changes_are_applied(context): instance = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) - assert instance.runtime_properties == _TEST_RUNTIME_PROPERTIES + assert all(instance.attributes[key].value == value + for key, value in _TEST_ATTRIBUTES.items()) -def _update_runtime_properties(context): - context.node.runtime_properties.clear() - context.node.runtime_properties.update(_TEST_RUNTIME_PROPERTIES) +def _update_attributes(context): + context.node.attributes.clear() + context.node.attributes.update(_TEST_ATTRIBUTES) def test_refresh_state_of_tracked_attributes(context, executor): @@ -66,11 +67,9 @@ def test_apply_tracked_changes_during_an_operation(context, executor): 'changed_but_refreshed': {'some': 'newer', 'properties': 'right there'} } - expected_initial = context.model.node.get_by_name( - mock.models.DEPENDENCY_NODE_NAME).runtime_properties - - out = _run_workflow(context=context, executor=executor, op_func=_mock_updating_operation, - inputs=inputs) + expected_initial = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME).attributes + out = _run_workflow( + context=context, executor=executor, op_func=_mock_updating_operation, inputs=inputs) expected_after_update = expected_initial.copy() expected_after_update.update(inputs['committed']) # pylint: disable=no-member @@ -109,42 +108,42 @@ def _run_workflow(context, executor, op_func, inputs=None): graph = mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter eng = engine.Engine(executor=executor, workflow_context=context, tasks_graph=graph) eng.execute() - return context.model.node.get_by_name( - mock.models.DEPENDENCY_NODE_NAME).runtime_properties.get('out') + out = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME).attributes.get('out') + return out.value if out else None @operation def _mock_success_operation(ctx): - _update_runtime_properties(ctx) + _update_attributes(ctx) @operation def _mock_fail_operation(ctx): - _update_runtime_properties(ctx) + _update_attributes(ctx) raise RuntimeError @operation def _mock_refreshing_operation(ctx): - out = {'initial': copy.deepcopy(ctx.node.runtime_properties)} - ctx.node.runtime_properties.update({'some': 'new', 'properties': 'right here'}) - out['after_change'] = copy.deepcopy(ctx.node.runtime_properties) + out = {'initial': copy.deepcopy(ctx.node.attributes)} + ctx.node.attributes.update({'some': 'new', 'properties': 'right here'}) + out['after_change'] = copy.deepcopy(ctx.node.attributes) ctx.model.node.refresh(ctx.node) - out['after_refresh'] = copy.deepcopy(ctx.node.runtime_properties) - ctx.node.runtime_properties['out'] = out + out['after_refresh'] = copy.deepcopy(ctx.node.attributes) + ctx.node.attributes['out'] = out @operation def _mock_updating_operation(ctx, committed, changed_but_refreshed): - out = {'initial': copy.deepcopy(ctx.node.runtime_properties)} - ctx.node.runtime_properties.update(committed) + out = {'initial': copy.deepcopy(ctx.node.attributes)} + ctx.node.attributes.update(committed) ctx.model.node.update(ctx.node) - out['after_update'] = copy.deepcopy(ctx.node.runtime_properties) - ctx.node.runtime_properties.update(changed_but_refreshed) - out['after_change'] = copy.deepcopy(ctx.node.runtime_properties) + out['after_update'] = copy.deepcopy(ctx.node.attributes) + ctx.node.attributes.update(changed_but_refreshed) + out['after_change'] = copy.deepcopy(ctx.node.attributes) ctx.model.node.refresh(ctx.node) - out['after_refresh'] = copy.deepcopy(ctx.node.runtime_properties) - ctx.node.runtime_properties['out'] = out + out['after_refresh'] = copy.deepcopy(ctx.node.attributes) + ctx.node.attributes['out'] = out def _operation_mapping(func):
