Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-163-Update-node-state-for-stub-tasks cc0ee52e4 -> e0509d9c4
removed utils from builtin workflows. the creation of the Operation Task handles some of the utils functions Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/e0509d9c Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/e0509d9c Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/e0509d9c Branch: refs/heads/ARIA-163-Update-node-state-for-stub-tasks Commit: e0509d9c4fa8db32e58f093009af74803d28cffd Parents: cc0ee52 Author: max-orlov <[email protected]> Authored: Mon May 1 11:13:51 2017 +0300 Committer: max-orlov <[email protected]> Committed: Mon May 1 11:13:51 2017 +0300 ---------------------------------------------------------------------- aria/orchestrator/workflows/api/task.py | 109 ++++++++++++++- .../workflows/builtin/execute_operation.py | 28 +--- aria/orchestrator/workflows/builtin/heal.py | 4 +- aria/orchestrator/workflows/builtin/install.py | 7 +- aria/orchestrator/workflows/builtin/start.py | 4 +- aria/orchestrator/workflows/builtin/stop.py | 4 +- .../orchestrator/workflows/builtin/uninstall.py | 9 +- aria/orchestrator/workflows/builtin/utils.py | 136 ------------------- .../orchestrator/workflows/builtin/workflows.py | 46 +++---- .../orchestrator/workflows/core/test_engine.py | 1 - .../tosca-simple-1.0/node-cellar/workflows.py | 10 +- 11 files changed, 147 insertions(+), 211 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/e0509d9c/aria/orchestrator/workflows/api/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py index f486a16..8fce8c1 100644 --- a/aria/orchestrator/workflows/api/task.py +++ b/aria/orchestrator/workflows/api/task.py @@ -21,6 +21,7 @@ from ... import context from ....modeling import models from ....modeling import utils as modeling_utils from ....utils.uuid import generate_uuid +from .. import exceptions class BaseTask(object): @@ -66,8 +67,7 @@ class OperationTask(BaseTask): inputs=None, max_attempts=None, retry_interval=None, - ignore_failure=None, - is_stub=False): + ignore_failure=None): """ Do not call this constructor directly. Instead, use :meth:`for_node` or :meth:`for_relationship`. @@ -92,7 +92,7 @@ class OperationTask(BaseTask): name=actor.name, interface=self.interface_name, operation=self.operation_name) - self.is_stub = is_stub + self.is_stub = self.is_empty(self.actor, self.interface_name, self.operation_name) if self.is_stub: return @@ -104,6 +104,28 @@ class OperationTask(BaseTask): def __repr__(self): return self.name + def __new__(cls, actor, interface_name, operation_name, *args, **kwargs): + """ + Returns a new operation task if the operation exists in the node, otherwise returns None. + """ + try: + cls.is_empty(actor, interface_name, operation_name) + return super(OperationTask, cls).__new__(cls) + except exceptions.OperationNotFoundException: + return None + + @staticmethod + def is_empty(actor, interface_name, operation_name): + interface = actor.interfaces.get(interface_name) + if interface: + operation = interface.operations.get(operation_name) + if operation: + return operation.implementation is None + + raise exceptions.OperationNotFoundException( + 'Could not find operation "{0}" on interface "{1}" for {2} "{3}"' + .format(operation_name, interface_name, type(actor).__name__.lower(), actor.name)) + class WorkflowTask(BaseTask): """ @@ -133,3 +155,84 @@ class WorkflowTask(BaseTask): return getattr(self._graph, item) except AttributeError: return super(WorkflowTask, self).__getattribute__(item) + + +def create_relationships_tasks( + node, interface_name, source_operation_name=None, target_operation_name=None, **kwargs): + """ + Creates a relationship task (source and target) for all of a node_instance relationships. + :param basestring source_operation_name: the relationship operation name. + :param basestring interface_name: the name of the interface. + :param source_operation_name: + :param target_operation_name: + :param NodeInstance node: the source_node + :return: + """ + sub_tasks = [] + for relationship in node.outbound_relationships: + relationship_operations = relationship_tasks( + relationship, + interface_name, + source_operation_name=source_operation_name, + target_operation_name=target_operation_name, + **kwargs) + sub_tasks.append(relationship_operations) + return sub_tasks + + +def relationship_tasks(relationship, interface_name, source_operation_name=None, + target_operation_name=None, **kwargs): + """ + Creates a relationship task source and target. + :param Relationship relationship: the relationship instance itself + :param source_operation_name: + :param target_operation_name: + + :return: + """ + operations = [] + if source_operation_name: + operations.append( + OperationTask( + relationship, + interface_name=interface_name, + operation_name=source_operation_name, + **kwargs + ) + ) + if target_operation_name: + operations.append( + OperationTask( + relationship, + interface_name=interface_name, + operation_name=target_operation_name, + **kwargs + ) + ) + + return [op for op in operations if op] + + +def create_node_task_dependencies(graph, tasks_and_nodes, reverse=False): + """ + Creates dependencies between tasks if there is a relationship (outbound) between their nodes. + """ + + def get_task(node_name): + for task, node in tasks_and_nodes: + if node.name == node_name: + return task + return None + + for task, node in tasks_and_nodes: + dependencies = [] + for relationship in node.outbound_relationships: + dependency = get_task(relationship.target_node.name) + if dependency: + dependencies.append(dependency) + if dependencies: + if reverse: + for dependency in dependencies: + graph.add_dependency(dependency, task) + else: + graph.add_dependency(task, dependencies) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/e0509d9c/aria/orchestrator/workflows/builtin/execute_operation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/execute_operation.py b/aria/orchestrator/workflows/builtin/execute_operation.py index 16504ec..02a654a 100644 --- a/aria/orchestrator/workflows/builtin/execute_operation.py +++ b/aria/orchestrator/workflows/builtin/execute_operation.py @@ -17,8 +17,8 @@ Builtin execute_operation workflow """ -from . import utils from ... import workflow +from ..api import task @workflow @@ -65,11 +65,11 @@ def execute_operation( # registering actual tasks to sequences for node in filtered_nodes: graph.add_tasks( - _create_node_task( - node=node, + task.OperationTask( + node, interface_name=interface_name, operation_name=operation_name, - operation_kwargs=operation_kwargs + inputs=operation_kwargs ) ) @@ -99,23 +99,3 @@ def _filter_nodes(context, node_template_ids=(), node_ids=(), type_names=()): _is_node_by_id(node.id), _is_node_by_type(node.node_template.type))): yield node - - -def _create_node_task( - node, - interface_name, - operation_name, - operation_kwargs): - """ - A workflow which executes a single operation - :param node: the node instance to install - :param basestring operation: the operation name - :param dict operation_kwargs: - :return: - """ - - return utils.create_node_task( - node=node, - interface_name=interface_name, - operation_name=operation_name, - inputs=operation_kwargs) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/e0509d9c/aria/orchestrator/workflows/builtin/heal.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/heal.py b/aria/orchestrator/workflows/builtin/heal.py index 92b96ea..8c76f6c 100644 --- a/aria/orchestrator/workflows/builtin/heal.py +++ b/aria/orchestrator/workflows/builtin/heal.py @@ -103,7 +103,7 @@ def heal_uninstall(ctx, graph, failing_nodes, targeted_nodes): graph.add_dependency(target_node_subgraph, node_sub_workflow) if target_node in failing_nodes: - dependency = relationship_tasks( + dependency = task.relationship_tasks( relationship=relationship, operation_name='aria.interfaces.relationship_lifecycle.unlink') graph.add_tasks(*dependency) @@ -157,7 +157,7 @@ def heal_install(ctx, graph, failing_nodes, targeted_nodes): graph.add_dependency(node_sub_workflow, target_node_subworkflow) if target_node in failing_nodes: - dependent = relationship_tasks( + dependent = task.relationship_tasks( relationship=relationship, operation_name='aria.interfaces.relationship_lifecycle.establish') graph.add_tasks(*dependent) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/e0509d9c/aria/orchestrator/workflows/builtin/install.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/install.py b/aria/orchestrator/workflows/builtin/install.py index 2b9ec66..c4ab16e 100644 --- a/aria/orchestrator/workflows/builtin/install.py +++ b/aria/orchestrator/workflows/builtin/install.py @@ -18,15 +18,14 @@ Builtin install workflow """ from .workflows import install_node -from .utils import create_node_task_dependencies -from ..api.task import WorkflowTask from ... import workflow +from ..api import task as api_task @workflow def install(ctx, graph): tasks_and_nodes = [] for node in ctx.nodes: - tasks_and_nodes.append((WorkflowTask(install_node, node=node), node)) + tasks_and_nodes.append((api_task.WorkflowTask(install_node, node=node), node)) graph.add_tasks([task for task, _ in tasks_and_nodes]) - create_node_task_dependencies(graph, tasks_and_nodes) + api_task.create_node_task_dependencies(graph, tasks_and_nodes) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/e0509d9c/aria/orchestrator/workflows/builtin/start.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/start.py b/aria/orchestrator/workflows/builtin/start.py index ad67554..1946143 100644 --- a/aria/orchestrator/workflows/builtin/start.py +++ b/aria/orchestrator/workflows/builtin/start.py @@ -18,11 +18,11 @@ Builtin start workflow """ from .workflows import start_node -from ..api.task import WorkflowTask from ... import workflow +from ..api import task as api_task @workflow def start(ctx, graph): for node in ctx.model.node.iter(): - graph.add_tasks(WorkflowTask(start_node, node=node)) + graph.add_tasks(api_task.WorkflowTask(start_node, node=node)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/e0509d9c/aria/orchestrator/workflows/builtin/stop.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/stop.py b/aria/orchestrator/workflows/builtin/stop.py index 23ac366..c1b60ae 100644 --- a/aria/orchestrator/workflows/builtin/stop.py +++ b/aria/orchestrator/workflows/builtin/stop.py @@ -18,11 +18,11 @@ Builtin stop workflow """ from .workflows import stop_node -from ..api.task import WorkflowTask +from ..api import task as api_task from ... import workflow @workflow def stop(ctx, graph): for node in ctx.model.node.iter(): - graph.add_tasks(WorkflowTask(stop_node, node=node)) + graph.add_tasks(api_task.WorkflowTask(stop_node, node=node)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/e0509d9c/aria/orchestrator/workflows/builtin/uninstall.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/uninstall.py b/aria/orchestrator/workflows/builtin/uninstall.py index e4afcd9..920dabf 100644 --- a/aria/orchestrator/workflows/builtin/uninstall.py +++ b/aria/orchestrator/workflows/builtin/uninstall.py @@ -18,17 +18,14 @@ Builtin uninstall workflow """ from .workflows import uninstall_node -from .utils import create_node_task_dependencies -from ..api.task import WorkflowTask from ... import workflow +from ..api import task as api_task @workflow def uninstall(ctx, graph): tasks_and_nodes = [] for node in ctx.nodes: - tasks_and_nodes.append(( - WorkflowTask(uninstall_node, node=node), - node)) + tasks_and_nodes.append((api_task.WorkflowTask(uninstall_node, node=node), node)) graph.add_tasks([task for task, _ in tasks_and_nodes]) - create_node_task_dependencies(graph, tasks_and_nodes, reverse=True) + api_task.create_node_task_dependencies(graph, tasks_and_nodes, reverse=True) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/e0509d9c/aria/orchestrator/workflows/builtin/utils.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/utils.py b/aria/orchestrator/workflows/builtin/utils.py deleted file mode 100644 index a9b7a00..0000000 --- a/aria/orchestrator/workflows/builtin/utils.py +++ /dev/null @@ -1,136 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ..api.task import OperationTask -from .. import exceptions - - -def create_node_task(node, interface_name, operation_name, **kwargs): - """ - Returns a new operation task if the operation exists in the node, otherwise returns None. - """ - - try: - return OperationTask(node, - interface_name=interface_name, - operation_name=operation_name, - is_stub=_is_empty_task(node, interface_name, operation_name), - **kwargs) - except exceptions.OperationNotFoundException: - # We will skip nodes which do not have the operation - return None - - -def create_relationships_tasks( - node, interface_name, source_operation_name=None, target_operation_name=None, **kwargs): - """ - Creates a relationship task (source and target) for all of a node_instance relationships. - :param basestring source_operation_name: the relationship operation name. - :param basestring interface_name: the name of the interface. - :param source_operation_name: - :param target_operation_name: - :param NodeInstance node: the source_node - :return: - """ - sub_tasks = [] - for relationship in node.outbound_relationships: - relationship_operations = relationship_tasks( - relationship, - interface_name, - source_operation_name=source_operation_name, - target_operation_name=target_operation_name, - **kwargs) - sub_tasks.append(relationship_operations) - return sub_tasks - - -def relationship_tasks(relationship, interface_name, source_operation_name=None, - target_operation_name=None, **kwargs): - """ - Creates a relationship task source and target. - :param Relationship relationship: the relationship instance itself - :param source_operation_name: - :param target_operation_name: - - :return: - """ - operations = [] - if source_operation_name: - try: - operations.append( - OperationTask( - relationship, - interface_name=interface_name, - operation_name=source_operation_name, - is_stub=_is_empty_task(relationship, interface_name, source_operation_name), - **kwargs - ) - ) - except exceptions.OperationNotFoundException: - # We will skip relationships which do not have the operation - pass - if target_operation_name: - try: - operations.append( - OperationTask( - relationship, - interface_name=interface_name, - operation_name=target_operation_name, - is_stub=_is_empty_task(relationship, interface_name, target_operation_name), - **kwargs - ) - ) - except exceptions.OperationNotFoundException: - # We will skip relationships which do not have the operation - pass - - return operations - - -def create_node_task_dependencies(graph, tasks_and_nodes, reverse=False): - """ - Creates dependencies between tasks if there is a relationship (outbound) between their nodes. - """ - - def get_task(node_name): - for task, node in tasks_and_nodes: - if node.name == node_name: - return task - return None - - for task, node in tasks_and_nodes: - dependencies = [] - for relationship in node.outbound_relationships: - dependency = get_task(relationship.target_node.name) - if dependency: - dependencies.append(dependency) - if dependencies: - if reverse: - for dependency in dependencies: - graph.add_dependency(dependency, task) - else: - graph.add_dependency(task, dependencies) - - -def _is_empty_task(actor, interface_name, operation_name): - interface = actor.interfaces.get(interface_name) - if interface: - operation = interface.operations.get(operation_name) - if operation: - return operation.implementation is None - - raise exceptions.OperationNotFoundException( - 'Could not find operation "{0}" on interface "{1}" for {2} "{3}"' - .format(operation_name, interface_name, type(actor).__name__.lower(), actor.name)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/e0509d9c/aria/orchestrator/workflows/builtin/workflows.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/workflows.py b/aria/orchestrator/workflows/builtin/workflows.py index 60f14ed..3b3c1ec 100644 --- a/aria/orchestrator/workflows/builtin/workflows.py +++ b/aria/orchestrator/workflows/builtin/workflows.py @@ -18,10 +18,7 @@ TSOCA normative lifecycle workflows. """ from ... import workflow -from .utils import ( - create_node_task, - create_relationships_tasks -) +from ..api import task NORMATIVE_STANDARD_INTERFACE = 'Standard' # 'tosca.interfaces.node.lifecycle.Standard' @@ -72,19 +69,18 @@ __all__ = ( @workflow(suffix_template='{node.name}') def install_node(graph, node, **kwargs): # Create - sequence = [create_node_task(node, - NORMATIVE_STANDARD_INTERFACE, NORMATIVE_CREATE)] + sequence = [task.OperationTask(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_CREATE)] # Configure - sequence += create_relationships_tasks(node, - NORMATIVE_CONFIGURE_INTERFACE, - NORMATIVE_PRE_CONFIGURE_SOURCE, - NORMATIVE_PRE_CONFIGURE_TARGET) - sequence.append(create_node_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_CONFIGURE)) - sequence += create_relationships_tasks(node, - NORMATIVE_CONFIGURE_INTERFACE, - NORMATIVE_POST_CONFIGURE_SOURCE, - NORMATIVE_POST_CONFIGURE_TARGET) + sequence += task.create_relationships_tasks(node, + NORMATIVE_CONFIGURE_INTERFACE, + NORMATIVE_PRE_CONFIGURE_SOURCE, + NORMATIVE_PRE_CONFIGURE_TARGET) + sequence.append(task.OperationTask(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_CONFIGURE)) + sequence += task.create_relationships_tasks(node, + NORMATIVE_CONFIGURE_INTERFACE, + NORMATIVE_POST_CONFIGURE_SOURCE, + NORMATIVE_POST_CONFIGURE_TARGET) # Start sequence += _create_start_tasks(node) @@ -97,9 +93,7 @@ def uninstall_node(graph, node, **kwargs): sequence = _create_stop_tasks(node) # Delete - sequence.append(create_node_task(node, - NORMATIVE_STANDARD_INTERFACE, - NORMATIVE_DELETE)) + sequence.append(task.OperationTask(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_DELETE)) graph.sequence(*sequence) @@ -115,16 +109,16 @@ def stop_node(graph, node, **kwargs): def _create_start_tasks(node): - sequence = [create_node_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_START)] - sequence += create_relationships_tasks(node, - NORMATIVE_CONFIGURE_INTERFACE, - NORMATIVE_ADD_SOURCE, NORMATIVE_ADD_TARGET) + sequence = [task.OperationTask(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_START)] + sequence += task.create_relationships_tasks(node, + NORMATIVE_CONFIGURE_INTERFACE, + NORMATIVE_ADD_SOURCE, NORMATIVE_ADD_TARGET) return sequence def _create_stop_tasks(node): - sequence = [create_node_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_STOP)] - sequence += create_relationships_tasks(node, - NORMATIVE_CONFIGURE_INTERFACE, - NORMATIVE_REMOVE_SOURCE, NORMATIVE_REMOVE_TARGET) + sequence = [task.OperationTask(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_STOP)] + sequence += task.create_relationships_tasks(node, + NORMATIVE_CONFIGURE_INTERFACE, + NORMATIVE_REMOVE_SOURCE, NORMATIVE_REMOVE_TARGET) return sequence http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/e0509d9c/tests/orchestrator/workflows/core/test_engine.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py index 5a19bcd..c9911dc 100644 --- a/tests/orchestrator/workflows/core/test_engine.py +++ b/tests/orchestrator/workflows/core/test_engine.py @@ -94,7 +94,6 @@ class BaseTest(object): max_attempts=max_attempts, retry_interval=retry_interval, ignore_failure=ignore_failure, - is_stub=is_stub ) @pytest.fixture(autouse=True) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/e0509d9c/tests/resources/service-templates/tosca-simple-1.0/node-cellar/workflows.py ---------------------------------------------------------------------- diff --git a/tests/resources/service-templates/tosca-simple-1.0/node-cellar/workflows.py b/tests/resources/service-templates/tosca-simple-1.0/node-cellar/workflows.py index abe1ee2..06e4f9e 100644 --- a/tests/resources/service-templates/tosca-simple-1.0/node-cellar/workflows.py +++ b/tests/resources/service-templates/tosca-simple-1.0/node-cellar/workflows.py @@ -1,5 +1,5 @@ from aria import workflow -from aria.orchestrator.workflows.builtin import utils +from aria.orchestrator.workflows.api import task from aria.orchestrator.workflows.exceptions import TaskException @@ -16,9 +16,9 @@ def maintenance(ctx, graph, enabled): for node in ctx.model.node.iter(): try: - graph.add_tasks(utils.create_node_task(node=node, - interface_name=INTERFACE_NAME, - operation_name=ENABLE_OPERATION_NAME if enabled - else DISABLE_OPERATION_NAME)) + graph.add_tasks(task.OperationTask(node, + interface_name=INTERFACE_NAME, + operation_name=ENABLE_OPERATION_NAME if enabled + else DISABLE_OPERATION_NAME)) except TaskException: pass
