Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-163-Update-node-state-for-stub-tasks 1303f0925 -> 33c7736f7 (forced update)
reverted the 'safe' behavior back to a function Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/33c7736f Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/33c7736f Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/33c7736f Branch: refs/heads/ARIA-163-Update-node-state-for-stub-tasks Commit: 33c7736f7858b1821d639223e36f77f350095b91 Parents: c56ab97 Author: max-orlov <[email protected]> Authored: Wed May 3 20:04:59 2017 +0300 Committer: max-orlov <[email protected]> Committed: Thu May 4 13:37:59 2017 +0300 ---------------------------------------------------------------------- aria/orchestrator/workflows/api/task.py | 102 ++++++++++--------- .../orchestrator/workflows/builtin/workflows.py | 10 +- aria/orchestrator/workflows/core/translation.py | 26 +++-- aria/orchestrator/workflows/events_logging.py | 9 +- .../orchestrator/workflows/core/test_engine.py | 41 +++----- 5 files changed, 93 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/33c7736f/aria/orchestrator/workflows/api/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py index 889a86a..52f1d66 100644 --- a/aria/orchestrator/workflows/api/task.py +++ b/aria/orchestrator/workflows/api/task.py @@ -72,63 +72,43 @@ class OperationTask(BaseTask): Do not call this constructor directly. Instead, use :meth:`for_node` or :meth:`for_relationship`. """ - - actor_type = type(actor).__name__.lower() assert isinstance(actor, (models.Node, models.Relationship)) - assert actor_type in ('node', 'relationship') - assert interface_name and operation_name super(OperationTask, self).__init__() - self.actor = actor - self.max_attempts = (self.workflow_context._task_max_attempts - if max_attempts is None else max_attempts) - self.retry_interval = (self.workflow_context._task_retry_interval - if retry_interval is None else retry_interval) - self.ignore_failure = (self.workflow_context._task_ignore_failure - if ignore_failure is None else ignore_failure) self.interface_name = interface_name self.operation_name = operation_name - self.name = OperationTask.NAME_FORMAT.format(type=actor_type, + self.max_attempts = max_attempts or self.workflow_context._task_max_attempts + self.retry_interval = retry_interval or self.workflow_context._task_retry_interval + self.ignore_failure = ignore_failure or self.workflow_context._task_ignore_failure + self.name = OperationTask.NAME_FORMAT.format(type=type(actor).__name__.lower(), name=actor.name, interface=self.interface_name, operation=self.operation_name) - if self.is_empty: - self.plugin = None - self.inputs = {} - self.implementation = None - else: - operation = self.actor.interfaces[self.interface_name].operations[self.operation_name] - self.plugin = operation.plugin - self.inputs = modeling_utils.create_inputs(inputs or {}, operation.inputs) - self.implementation = operation.implementation + # Creating OperationTask directly should raise an error when there is no + # interface/operation. + + if not has_operation(self.actor, self.interface_name, self.operation_name): + raise exceptions.OperationNotFoundException( + 'Could not find operation "{self.operation_name}" on interface ' + '"{self.interface_name}" for {actor_type} "{actor.name}"'.format( + self=self, + actor_type=type(actor).__name__.lower(), + actor=actor) + ) + + operation = self.actor.interfaces[self.interface_name].operations[self.operation_name] + self.plugin = operation.plugin + self.inputs = modeling_utils.create_inputs(inputs or {}, operation.inputs) + self.implementation = operation.implementation def __repr__(self): return self.name - # def __new__(cls, actor, interface_name, operation_name, *args, **kwargs): - # """ - # Returns a new operation task if the operation exists in the node, otherwise returns None. - # """ - # try: - # cls.is_empty(actor, interface_name, operation_name) - # return super(OperationTask, cls).__new__(cls) - # except exceptions.OperationNotFoundException: - # return None - - @property - def is_empty(self): - interface = self.actor.interfaces.get(self.interface_name) - if interface: - operation = interface.operations.get(self.operation_name) - if operation: - return operation.implementation is None - raise exceptions.OperationNotFoundException( - 'Could not find operation "{0}" on interface "{1}" for {2} "{3}"' - .format(self.operation_name, - self.interface_name, - type(self.actor).__name__.lower(), - self.actor.name)) +class StubTask(BaseTask): + """ + Enables creating empty tasks. + """ class WorkflowTask(BaseTask): @@ -161,6 +141,27 @@ class WorkflowTask(BaseTask): return super(WorkflowTask, self).__getattribute__(item) +def create_task(actor, interface_name, operation_name, **kwargs): + """ + This helper function enables safe creation of OperationTask, if the supplied interface and + operation have no implementation, None is returned. + :param actor: the actor for this task + :param interface_name: the name of the interface + :param operation_name: the name of the operation + :param kwargs: any additional kwargs to be passed to the task OperationTask + :return: and OperationTask or None (if no interface/operation exists) + """ + try: + return OperationTask( + actor, + interface_name=interface_name, + operation_name=operation_name, + **kwargs + ) + except exceptions.OperationNotFoundException: + return None + + def create_relationships_tasks( node, interface_name, source_operation_name=None, target_operation_name=None, **kwargs): """ @@ -197,7 +198,7 @@ def create_relationship_tasks(relationship, interface_name, source_operation_nam operations = [] if source_operation_name: operations.append( - OperationTask( + create_task( relationship, interface_name=interface_name, operation_name=source_operation_name, @@ -206,7 +207,7 @@ def create_relationship_tasks(relationship, interface_name, source_operation_nam ) if target_operation_name: operations.append( - OperationTask( + create_task( relationship, interface_name=interface_name, operation_name=target_operation_name, @@ -215,3 +216,12 @@ def create_relationship_tasks(relationship, interface_name, source_operation_nam ) return operations + + +def has_operation(actor, interface_name, operation_name): + interface = actor.interfaces.get(interface_name) + if interface: + operation = interface.operations.get(operation_name) + if operation and operation.implementation: + return True + return False http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/33c7736f/aria/orchestrator/workflows/builtin/workflows.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/workflows.py b/aria/orchestrator/workflows/builtin/workflows.py index 1fc9eed..b286e98 100644 --- a/aria/orchestrator/workflows/builtin/workflows.py +++ b/aria/orchestrator/workflows/builtin/workflows.py @@ -69,14 +69,14 @@ __all__ = ( @workflow(suffix_template='{node.name}') def install_node(graph, node, **kwargs): # Create - sequence = [task.OperationTask(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_CREATE)] + sequence = [task.create_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_CREATE)] # Configure sequence += task.create_relationships_tasks(node, NORMATIVE_CONFIGURE_INTERFACE, NORMATIVE_PRE_CONFIGURE_SOURCE, NORMATIVE_PRE_CONFIGURE_TARGET) - sequence.append(task.OperationTask(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_CONFIGURE)) + sequence.append(task.create_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_CONFIGURE)) sequence += task.create_relationships_tasks(node, NORMATIVE_CONFIGURE_INTERFACE, NORMATIVE_POST_CONFIGURE_SOURCE, @@ -93,7 +93,7 @@ def uninstall_node(graph, node, **kwargs): sequence = _create_stop_tasks(node) # Delete - sequence.append(task.OperationTask(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_DELETE)) + sequence.append(task.create_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_DELETE)) graph.sequence(*sequence) @@ -109,7 +109,7 @@ def stop_node(graph, node, **kwargs): def _create_start_tasks(node): - sequence = [task.OperationTask(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_START)] + sequence = [task.create_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_START)] sequence += task.create_relationships_tasks(node, NORMATIVE_CONFIGURE_INTERFACE, NORMATIVE_ADD_SOURCE, NORMATIVE_ADD_TARGET) @@ -117,7 +117,7 @@ def _create_start_tasks(node): def _create_stop_tasks(node): - sequence = [task.OperationTask(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_STOP)] + sequence = [task.create_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_STOP)] sequence += task.create_relationships_tasks(node, NORMATIVE_CONFIGURE_INTERFACE, NORMATIVE_REMOVE_SOURCE, NORMATIVE_REMOVE_TARGET) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/33c7736f/aria/orchestrator/workflows/core/translation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/translation.py b/aria/orchestrator/workflows/core/translation.py index d764024..b31ea8a 100644 --- a/aria/orchestrator/workflows/core/translation.py +++ b/aria/orchestrator/workflows/core/translation.py @@ -44,17 +44,14 @@ def build_execution_graph( for api_task in task_graph.topological_order(reverse=True): dependencies = task_graph.get_dependencies(api_task) operation_dependencies = _get_tasks_from_dependencies( - execution_graph, - dependencies, - default=[start_task]) + execution_graph, dependencies, default=[start_task]) if isinstance(api_task, api.task.OperationTask): - if api_task.is_empty: - operation_task = core_task.OperationTask(api_task) - else: + if api_task.implementation: operation_task = core_task.OperationTask(api_task, executor=default_executor) + else: + operation_task = core_task.OperationTask(api_task) _add_task_and_dependencies(execution_graph, operation_task, operation_dependencies) - elif isinstance(api_task, api.task.WorkflowTask): # Build the graph recursively while adding start and end markers build_execution_graph( @@ -65,6 +62,9 @@ def build_execution_graph( end_cls=core_task.EndSubWorkflowTask, depends_on=operation_dependencies ) + elif isinstance(api_task, api.task.StubTask): + stub_task = core_task.StubTask(id=api_task.id) + _add_task_and_dependencies(execution_graph, stub_task, operation_dependencies) else: raise RuntimeError('Undefined state') @@ -87,10 +87,14 @@ def _get_tasks_from_dependencies(execution_graph, dependencies, default=()): """ Returns task list from dependencies. """ - return [execution_graph.node[dependency.id - if isinstance(dependency, api.task.OperationTask) - else _end_graph_suffix(dependency.id)]['task'] - for dependency in dependencies] or default + tasks = [] + for dependency in dependencies: + if isinstance(dependency, (api.task.OperationTask, api.task.StubTask)): + dependency_id = dependency.id + else: + dependency_id = _end_graph_suffix(dependency.id) + tasks.append(execution_graph.node[dependency_id]['task']) + return tasks or default def _start_graph_suffix(id): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/33c7736f/aria/orchestrator/workflows/events_logging.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/events_logging.py b/aria/orchestrator/workflows/events_logging.py index b031146..9913012 100644 --- a/aria/orchestrator/workflows/events_logging.py +++ b/aria/orchestrator/workflows/events_logging.py @@ -36,12 +36,9 @@ def _get_task_name(task): @events.start_task_signal.connect def _start_task_handler(task, **kwargs): # If the task has not implementation this is an empty task. - if task.implementation: - task.context.logger.debug('{name} {task.interface_name}.{task.operation_name} has no ' - 'implementation'.format(name=_get_task_name(task), task=task)) - else: - task.context.logger.info('{name} {task.interface_name}.{task.operation_name} started...' - .format(name=_get_task_name(task), task=task)) + suffix = 'started...' if task.implementation else 'has no implementation' + task.context.logger.debug('{name} {task.interface_name}.{task.operation_name} {suffix}' + .format(name=_get_task_name(task), task=task, suffix=suffix)) @events.on_success_task_signal.connect http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/33c7736f/tests/orchestrator/workflows/core/test_engine.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py index c9911dc..8c0705b 100644 --- a/tests/orchestrator/workflows/core/test_engine.py +++ b/tests/orchestrator/workflows/core/test_engine.py @@ -56,40 +56,27 @@ class BaseTest(object): @staticmethod def _op(ctx, - func=None, + func, inputs=None, max_attempts=None, retry_interval=None, - ignore_failure=None, - is_stub=False): + ignore_failure=None): node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) - - if not is_stub: - operation_kwargs = dict(implementation='{name}.{func.__name__}'.format( - name=__name__, func=func)) - if inputs: - # the operation has to declare the inputs before those may be passed - operation_kwargs['inputs'] = inputs - - interface = mock.models.create_interface( - node.service, - 'aria.interfaces.lifecycle', - 'create', - operation_kwargs=operation_kwargs - ) - else: - interface = mock.models.create_interface( - node.service, - 'aria.interfaces.lifecycle', - 'create', - ) - + interface_name = 'aria.interfaces.lifecycle' + operation_kwargs = dict(implementation='{name}.{func.__name__}'.format( + name=__name__, func=func)) + if inputs: + # the operation has to declare the inputs before those may be passed + operation_kwargs['inputs'] = inputs + operation_name = 'create' + interface = mock.models.create_interface(node.service, interface_name, operation_name, + operation_kwargs=operation_kwargs) node.interfaces[interface.name] = interface return api.task.OperationTask( node, interface_name='aria.interfaces.lifecycle', - operation_name='create', + operation_name=operation_name, inputs=inputs or {}, max_attempts=max_attempts, retry_interval=retry_interval, @@ -218,7 +205,7 @@ class TestEngine(BaseTest): @workflow def sub_workflow(ctx, graph): op1 = self._op(ctx, func=mock_ordered_task, inputs={'counter': 1}) - op2 = self._op(ctx, is_stub=True) + op2 = api.task.StubTask() op3 = self._op(ctx, func=mock_ordered_task, inputs={'counter': 2}) graph.sequence(op1, op2, op3) @@ -231,7 +218,7 @@ class TestEngine(BaseTest): assert workflow_context.states == ['start', 'success'] assert workflow_context.exception is None assert global_test_holder.get('invocations') == [1, 2] - assert global_test_holder.get('sent_task_signal_calls') == 3 + assert global_test_holder.get('sent_task_signal_calls') == 2 class TestCancel(BaseTest):
