Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-163-Update-node-state-for-stub-tasks 46dbed7be -> 71228058a (forced update)
review0.5 Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/71228058 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/71228058 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/71228058 Branch: refs/heads/ARIA-163-Update-node-state-for-stub-tasks Commit: 71228058a7be329bc7166b12be08561bf93de10b Parents: 94cb2a1 Author: max-orlov <[email protected]> Authored: Wed May 3 14:24:54 2017 +0300 Committer: max-orlov <[email protected]> Committed: Wed May 3 14:26:44 2017 +0300 ---------------------------------------------------------------------- aria/orchestrator/workflows/api/task.py | 51 +++++--------------- aria/orchestrator/workflows/builtin/heal.py | 4 +- aria/orchestrator/workflows/builtin/install.py | 6 +-- .../orchestrator/workflows/builtin/uninstall.py | 6 +-- .../orchestrator/workflows/builtin/workflows.py | 25 ++++++++++ .../workflows/core/events_handler.py | 5 -- aria/orchestrator/workflows/core/task.py | 16 ++---- aria/orchestrator/workflows/executor/base.py | 12 +++++ aria/orchestrator/workflows/executor/dry.py | 14 ------ .../profiles/tosca-simple-1.0/interfaces.yaml | 13 +++-- 10 files changed, 70 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/71228058/aria/orchestrator/workflows/api/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py index 8fce8c1..cb1618c 100644 --- a/aria/orchestrator/workflows/api/task.py +++ b/aria/orchestrator/workflows/api/task.py @@ -104,15 +104,15 @@ class OperationTask(BaseTask): def __repr__(self): return self.name - def __new__(cls, actor, interface_name, operation_name, *args, **kwargs): - """ - Returns a new operation task if the operation exists in the node, otherwise returns None. - """ - try: - cls.is_empty(actor, interface_name, operation_name) - return super(OperationTask, cls).__new__(cls) - except exceptions.OperationNotFoundException: - return None + # def __new__(cls, actor, interface_name, operation_name, *args, **kwargs): + # """ + # Returns a new operation task if the operation exists in the node, otherwise returns None. + # """ + # try: + # cls.is_empty(actor, interface_name, operation_name) + # return super(OperationTask, cls).__new__(cls) + # except exceptions.OperationNotFoundException: + # return None @staticmethod def is_empty(actor, interface_name, operation_name): @@ -170,7 +170,7 @@ def create_relationships_tasks( """ sub_tasks = [] for relationship in node.outbound_relationships: - relationship_operations = relationship_tasks( + relationship_operations = create_relationship_tasks( relationship, interface_name, source_operation_name=source_operation_name, @@ -180,8 +180,8 @@ def create_relationships_tasks( return sub_tasks -def relationship_tasks(relationship, interface_name, source_operation_name=None, - target_operation_name=None, **kwargs): +def create_relationship_tasks(relationship, interface_name, source_operation_name=None, + target_operation_name=None, **kwargs): """ Creates a relationship task source and target. :param Relationship relationship: the relationship instance itself @@ -210,29 +210,4 @@ def relationship_tasks(relationship, interface_name, source_operation_name=None, ) ) - return [op for op in operations if op] - - -def create_node_task_dependencies(graph, tasks_and_nodes, reverse=False): - """ - Creates dependencies between tasks if there is a relationship (outbound) between their nodes. - """ - - def get_task(node_name): - for task, node in tasks_and_nodes: - if node.name == node_name: - return task - return None - - for task, node in tasks_and_nodes: - dependencies = [] - for relationship in node.outbound_relationships: - dependency = get_task(relationship.target_node.name) - if dependency: - dependencies.append(dependency) - if dependencies: - if reverse: - for dependency in dependencies: - graph.add_dependency(dependency, task) - else: - graph.add_dependency(task, dependencies) + return operations http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/71228058/aria/orchestrator/workflows/builtin/heal.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/heal.py b/aria/orchestrator/workflows/builtin/heal.py index 8c76f6c..ca382e8 100644 --- a/aria/orchestrator/workflows/builtin/heal.py +++ b/aria/orchestrator/workflows/builtin/heal.py @@ -103,7 +103,7 @@ def heal_uninstall(ctx, graph, failing_nodes, targeted_nodes): graph.add_dependency(target_node_subgraph, node_sub_workflow) if target_node in failing_nodes: - dependency = task.relationship_tasks( + dependency = task.create_relationship_tasks( relationship=relationship, operation_name='aria.interfaces.relationship_lifecycle.unlink') graph.add_tasks(*dependency) @@ -157,7 +157,7 @@ def heal_install(ctx, graph, failing_nodes, targeted_nodes): graph.add_dependency(node_sub_workflow, target_node_subworkflow) if target_node in failing_nodes: - dependent = task.relationship_tasks( + dependent = task.create_relationship_tasks( relationship=relationship, operation_name='aria.interfaces.relationship_lifecycle.establish') graph.add_tasks(*dependent) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/71228058/aria/orchestrator/workflows/builtin/install.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/install.py b/aria/orchestrator/workflows/builtin/install.py index c4ab16e..821b190 100644 --- a/aria/orchestrator/workflows/builtin/install.py +++ b/aria/orchestrator/workflows/builtin/install.py @@ -17,15 +17,15 @@ Builtin install workflow """ -from .workflows import install_node from ... import workflow from ..api import task as api_task +from . import workflows @workflow def install(ctx, graph): tasks_and_nodes = [] for node in ctx.nodes: - tasks_and_nodes.append((api_task.WorkflowTask(install_node, node=node), node)) + tasks_and_nodes.append((api_task.WorkflowTask(workflows.install_node, node=node), node)) graph.add_tasks([task for task, _ in tasks_and_nodes]) - api_task.create_node_task_dependencies(graph, tasks_and_nodes) + workflows.create_node_task_dependencies(graph, tasks_and_nodes) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/71228058/aria/orchestrator/workflows/builtin/uninstall.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/uninstall.py b/aria/orchestrator/workflows/builtin/uninstall.py index 920dabf..c35117e 100644 --- a/aria/orchestrator/workflows/builtin/uninstall.py +++ b/aria/orchestrator/workflows/builtin/uninstall.py @@ -17,15 +17,15 @@ Builtin uninstall workflow """ -from .workflows import uninstall_node from ... import workflow from ..api import task as api_task +from . import workflows @workflow def uninstall(ctx, graph): tasks_and_nodes = [] for node in ctx.nodes: - tasks_and_nodes.append((api_task.WorkflowTask(uninstall_node, node=node), node)) + tasks_and_nodes.append((api_task.WorkflowTask(workflows.uninstall_node, node=node), node)) graph.add_tasks([task for task, _ in tasks_and_nodes]) - api_task.create_node_task_dependencies(graph, tasks_and_nodes, reverse=True) + workflows.create_node_task_dependencies(graph, tasks_and_nodes, reverse=True) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/71228058/aria/orchestrator/workflows/builtin/workflows.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/workflows.py b/aria/orchestrator/workflows/builtin/workflows.py index 3b3c1ec..c2db20b 100644 --- a/aria/orchestrator/workflows/builtin/workflows.py +++ b/aria/orchestrator/workflows/builtin/workflows.py @@ -122,3 +122,28 @@ def _create_stop_tasks(node): NORMATIVE_CONFIGURE_INTERFACE, NORMATIVE_REMOVE_SOURCE, NORMATIVE_REMOVE_TARGET) return sequence + + +def create_node_task_dependencies(graph, tasks_and_nodes, reverse=False): + """ + Creates dependencies between tasks if there is a relationship (outbound) between their nodes. + """ + + def get_task(node_name): + for task, node in tasks_and_nodes: + if node.name == node_name: + return task + return None + + for task, node in tasks_and_nodes: + dependencies = [] + for relationship in node.outbound_relationships: + dependency = get_task(relationship.target_node.name) + if dependency: + dependencies.append(dependency) + if dependencies: + if reverse: + for dependency in dependencies: + graph.add_dependency(dependency, task) + else: + graph.add_dependency(task, dependencies) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/71228058/aria/orchestrator/workflows/core/events_handler.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py index 83b79d5..48356ec 100644 --- a/aria/orchestrator/workflows/core/events_handler.py +++ b/aria/orchestrator/workflows/core/events_handler.py @@ -41,11 +41,6 @@ def _task_started(task, *args, **kwargs): task.started_at = datetime.utcnow() task.status = task.STARTED [email protected]_task_signal.connect -def _node_task_started(task, *args, **kwargs): - with task._update(): - _update_node_state_if_necessary(task, is_transitional=True) - @events.on_failure_task_signal.connect def _task_failed(task, exception, *args, **kwargs): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/71228058/aria/orchestrator/workflows/core/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py index 181e47b..05e3365 100644 --- a/aria/orchestrator/workflows/core/task.py +++ b/aria/orchestrator/workflows/core/task.py @@ -28,7 +28,7 @@ from functools import ( from ....modeling import models from ...context import operation as operation_context from .. import exceptions -from ..executor import dry +from ..executor import base def _locked(func=None): @@ -70,11 +70,9 @@ class StubTask(BaseTask): or sub-workflow """ def __init__(self, *args, **kwargs): - super(StubTask, self).__init__(executor=dry.MarkerExecutor(), *args, **kwargs) + super(StubTask, self).__init__(executor=base.StubExecutor(), *args, **kwargs) self.status = models.Task.PENDING self.due_at = datetime.utcnow() - self.started_at = None - self.ended_at = None def has_ended(self): return self.status in (models.Task.SUCCESS, models.Task.FAILED) @@ -83,11 +81,9 @@ class StubTask(BaseTask): return self.status in (models.Task.PENDING, models.Task.RETRYING) def end(self): - self.ended_at = datetime.utcnow() self.status = models.Task.SUCCESS def start(self): - self.started_at = datetime.utcnow() self.status = models.Task.STARTED @@ -127,7 +123,7 @@ class OperationTask(BaseTask): # If no executor is provided, we defer that this is a stub task which does not need to be # executed. super(OperationTask, self).__init__( - id=api_task.id, executor=executor or dry.StubExecutor(), *args, **kwargs) + id=api_task.id, executor=executor or base.EmptyOperationExecutor(), *args, **kwargs) self._workflow_context = api_task._workflow_context self.interface_name = api_task.interface_name @@ -146,12 +142,6 @@ class OperationTask(BaseTask): raise RuntimeError('No operation context could be created for {actor.model_cls}' .format(actor=api_task.actor)) - # TODO: this executor should be put into the task (if no executor was setup in the - # operation) - # executor = '{module}.{name}'.format(module=self._executor.__module__, - # name=self._executor.__class__.__name__ - # ) - task_model = create_task_model( name=api_task.name, actor=api_task.actor, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/71228058/aria/orchestrator/workflows/executor/base.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py index f11a6b7..f5109d7 100644 --- a/aria/orchestrator/workflows/executor/base.py +++ b/aria/orchestrator/workflows/executor/base.py @@ -54,3 +54,15 @@ class BaseExecutor(logger.LoggerMixin): @staticmethod def _task_succeeded(task): events.on_success_task_signal.send(task) + + +class StubExecutor(BaseExecutor): + def execute(self, task): + task.start() + task.end() + + +class EmptyOperationExecutor(BaseExecutor): + def execute(self, task): + events.start_task_signal.send(task) + events.on_success_task_signal.send(task) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/71228058/aria/orchestrator/workflows/executor/dry.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/dry.py b/aria/orchestrator/workflows/executor/dry.py index 55d8f98..7da48f3 100644 --- a/aria/orchestrator/workflows/executor/dry.py +++ b/aria/orchestrator/workflows/executor/dry.py @@ -21,20 +21,6 @@ from aria.orchestrator import events from .base import BaseExecutor -# TODO: the name of this module should definitely change - -class MarkerExecutor(BaseExecutor): - def execute(self, task): - task.start() - task.end() - - -class StubExecutor(BaseExecutor): - def execute(self, task): - events.start_task_signal.send(task) - events.on_success_task_signal.send(task) - - class DryExecutor(BaseExecutor): """ Executor which dry runs tasks - prints task information without causing any side effects http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/71228058/extensions/aria_extension_tosca/profiles/tosca-simple-1.0/interfaces.yaml ---------------------------------------------------------------------- diff --git a/extensions/aria_extension_tosca/profiles/tosca-simple-1.0/interfaces.yaml b/extensions/aria_extension_tosca/profiles/tosca-simple-1.0/interfaces.yaml index ff6ba6c..a3021f9 100644 --- a/extensions/aria_extension_tosca/profiles/tosca-simple-1.0/interfaces.yaml +++ b/extensions/aria_extension_tosca/profiles/tosca-simple-1.0/interfaces.yaml @@ -82,14 +82,14 @@ interface_types: relationship_edge: target add_target: description: >- - Operation to notify the source node of a target node being added via a relationship. + Operation to notify the target node of a target node being added via a relationship. _extensions: - relationship_edge: source + relationship_edge: target add_source: description: >- - Operation to notify the target node of a source node which is now available via a relationship. + Operation to notify the source node of a source node which is now available via a relationship. _extensions: - relationship_edge: target + relationship_edge: source target_changed: description: >- Operation to notify source some property or attribute of the target changed @@ -99,4 +99,9 @@ interface_types: description: >- Operation to remove a target node. _extensions: + relationship_edge: target + remove_source: + description: >- + Operation to remove a source node. + _extensions: relationship_edge: source
