Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-163-Update-node-state-for-stub-tasks 71228058a -> 95255a790 
(forced update)


review0.5


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/95255a79
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/95255a79
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/95255a79

Branch: refs/heads/ARIA-163-Update-node-state-for-stub-tasks
Commit: 95255a79007e9396d69eb64716f2dedee2b4f8f8
Parents: 94cb2a1
Author: max-orlov <[email protected]>
Authored: Wed May 3 14:24:54 2017 +0300
Committer: max-orlov <[email protected]>
Committed: Wed May 3 15:02:27 2017 +0300

----------------------------------------------------------------------
 aria/orchestrator/workflows/api/task.py         | 51 +++++---------------
 aria/orchestrator/workflows/builtin/heal.py     |  4 +-
 aria/orchestrator/workflows/builtin/install.py  |  6 +--
 .../orchestrator/workflows/builtin/uninstall.py |  6 +--
 .../orchestrator/workflows/builtin/workflows.py | 25 ++++++++++
 .../workflows/core/events_handler.py            |  4 --
 aria/orchestrator/workflows/core/task.py        | 16 ++----
 aria/orchestrator/workflows/executor/base.py    | 12 +++++
 aria/orchestrator/workflows/executor/dry.py     | 14 ------
 .../profiles/tosca-simple-1.0/interfaces.yaml   | 13 +++--
 .../test_task_graph_into_execution_graph.py     |  4 +-
 11 files changed, 72 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95255a79/aria/orchestrator/workflows/api/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/api/task.py 
b/aria/orchestrator/workflows/api/task.py
index 8fce8c1..cb1618c 100644
--- a/aria/orchestrator/workflows/api/task.py
+++ b/aria/orchestrator/workflows/api/task.py
@@ -104,15 +104,15 @@ class OperationTask(BaseTask):
     def __repr__(self):
         return self.name
 
-    def __new__(cls, actor, interface_name, operation_name, *args, **kwargs):
-        """
-        Returns a new operation task if the operation exists in the node, 
otherwise returns None.
-        """
-        try:
-            cls.is_empty(actor, interface_name, operation_name)
-            return super(OperationTask, cls).__new__(cls)
-        except exceptions.OperationNotFoundException:
-            return None
+    # def __new__(cls, actor, interface_name, operation_name, *args, **kwargs):
+    #     """
+    #     Returns a new operation task if the operation exists in the node, 
otherwise returns None.
+    #     """
+    #     try:
+    #         cls.is_empty(actor, interface_name, operation_name)
+    #         return super(OperationTask, cls).__new__(cls)
+    #     except exceptions.OperationNotFoundException:
+    #         return None
 
     @staticmethod
     def is_empty(actor, interface_name, operation_name):
@@ -170,7 +170,7 @@ def create_relationships_tasks(
     """
     sub_tasks = []
     for relationship in node.outbound_relationships:
-        relationship_operations = relationship_tasks(
+        relationship_operations = create_relationship_tasks(
             relationship,
             interface_name,
             source_operation_name=source_operation_name,
@@ -180,8 +180,8 @@ def create_relationships_tasks(
     return sub_tasks
 
 
-def relationship_tasks(relationship, interface_name, 
source_operation_name=None,
-                       target_operation_name=None, **kwargs):
+def create_relationship_tasks(relationship, interface_name, 
source_operation_name=None,
+                              target_operation_name=None, **kwargs):
     """
     Creates a relationship task source and target.
     :param Relationship relationship: the relationship instance itself
@@ -210,29 +210,4 @@ def relationship_tasks(relationship, interface_name, 
source_operation_name=None,
             )
         )
 
-    return [op for op in operations if op]
-
-
-def create_node_task_dependencies(graph, tasks_and_nodes, reverse=False):
-    """
-    Creates dependencies between tasks if there is a relationship (outbound) 
between their nodes.
-    """
-
-    def get_task(node_name):
-        for task, node in tasks_and_nodes:
-            if node.name == node_name:
-                return task
-        return None
-
-    for task, node in tasks_and_nodes:
-        dependencies = []
-        for relationship in node.outbound_relationships:
-            dependency = get_task(relationship.target_node.name)
-            if dependency:
-                dependencies.append(dependency)
-        if dependencies:
-            if reverse:
-                for dependency in dependencies:
-                    graph.add_dependency(dependency, task)
-            else:
-                graph.add_dependency(task, dependencies)
+    return operations

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95255a79/aria/orchestrator/workflows/builtin/heal.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/builtin/heal.py 
b/aria/orchestrator/workflows/builtin/heal.py
index 8c76f6c..ca382e8 100644
--- a/aria/orchestrator/workflows/builtin/heal.py
+++ b/aria/orchestrator/workflows/builtin/heal.py
@@ -103,7 +103,7 @@ def heal_uninstall(ctx, graph, failing_nodes, 
targeted_nodes):
             graph.add_dependency(target_node_subgraph, node_sub_workflow)
 
             if target_node in failing_nodes:
-                dependency = task.relationship_tasks(
+                dependency = task.create_relationship_tasks(
                     relationship=relationship,
                     
operation_name='aria.interfaces.relationship_lifecycle.unlink')
                 graph.add_tasks(*dependency)
@@ -157,7 +157,7 @@ def heal_install(ctx, graph, failing_nodes, targeted_nodes):
             graph.add_dependency(node_sub_workflow, target_node_subworkflow)
 
             if target_node in failing_nodes:
-                dependent = task.relationship_tasks(
+                dependent = task.create_relationship_tasks(
                     relationship=relationship,
                     
operation_name='aria.interfaces.relationship_lifecycle.establish')
                 graph.add_tasks(*dependent)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95255a79/aria/orchestrator/workflows/builtin/install.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/builtin/install.py 
b/aria/orchestrator/workflows/builtin/install.py
index c4ab16e..821b190 100644
--- a/aria/orchestrator/workflows/builtin/install.py
+++ b/aria/orchestrator/workflows/builtin/install.py
@@ -17,15 +17,15 @@
 Builtin install workflow
 """
 
-from .workflows import install_node
 from ... import workflow
 from ..api import task as api_task
+from . import workflows
 
 
 @workflow
 def install(ctx, graph):
     tasks_and_nodes = []
     for node in ctx.nodes:
-        tasks_and_nodes.append((api_task.WorkflowTask(install_node, 
node=node), node))
+        tasks_and_nodes.append((api_task.WorkflowTask(workflows.install_node, 
node=node), node))
     graph.add_tasks([task for task, _ in tasks_and_nodes])
-    api_task.create_node_task_dependencies(graph, tasks_and_nodes)
+    workflows.create_node_task_dependencies(graph, tasks_and_nodes)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95255a79/aria/orchestrator/workflows/builtin/uninstall.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/builtin/uninstall.py 
b/aria/orchestrator/workflows/builtin/uninstall.py
index 920dabf..c35117e 100644
--- a/aria/orchestrator/workflows/builtin/uninstall.py
+++ b/aria/orchestrator/workflows/builtin/uninstall.py
@@ -17,15 +17,15 @@
 Builtin uninstall workflow
 """
 
-from .workflows import uninstall_node
 from ... import workflow
 from ..api import task as api_task
+from . import workflows
 
 
 @workflow
 def uninstall(ctx, graph):
     tasks_and_nodes = []
     for node in ctx.nodes:
-        tasks_and_nodes.append((api_task.WorkflowTask(uninstall_node, 
node=node), node))
+        
tasks_and_nodes.append((api_task.WorkflowTask(workflows.uninstall_node, 
node=node), node))
     graph.add_tasks([task for task, _ in tasks_and_nodes])
-    api_task.create_node_task_dependencies(graph, tasks_and_nodes, 
reverse=True)
+    workflows.create_node_task_dependencies(graph, tasks_and_nodes, 
reverse=True)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95255a79/aria/orchestrator/workflows/builtin/workflows.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/builtin/workflows.py 
b/aria/orchestrator/workflows/builtin/workflows.py
index 3b3c1ec..1fc9eed 100644
--- a/aria/orchestrator/workflows/builtin/workflows.py
+++ b/aria/orchestrator/workflows/builtin/workflows.py
@@ -122,3 +122,28 @@ def _create_stop_tasks(node):
                                                 NORMATIVE_CONFIGURE_INTERFACE,
                                                 NORMATIVE_REMOVE_SOURCE, 
NORMATIVE_REMOVE_TARGET)
     return sequence
+
+
+def create_node_task_dependencies(graph, tasks_and_nodes, reverse=False):
+    """
+    Creates dependencies between tasks if there is a relationship (outbound) 
between their nodes.
+    """
+
+    def get_task(node_name):
+        for api_task, task_node in tasks_and_nodes:
+            if task_node.name == node_name:
+                return api_task
+        return None
+
+    for api_task, node in tasks_and_nodes:
+        dependencies = []
+        for relationship in node.outbound_relationships:
+            dependency = get_task(relationship.target_node.name)
+            if dependency:
+                dependencies.append(dependency)
+        if dependencies:
+            if reverse:
+                for dependency in dependencies:
+                    graph.add_dependency(dependency, api_task)
+            else:
+                graph.add_dependency(api_task, dependencies)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95255a79/aria/orchestrator/workflows/core/events_handler.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/events_handler.py 
b/aria/orchestrator/workflows/core/events_handler.py
index 83b79d5..4d24bb7 100644
--- a/aria/orchestrator/workflows/core/events_handler.py
+++ b/aria/orchestrator/workflows/core/events_handler.py
@@ -40,10 +40,6 @@ def _task_started(task, *args, **kwargs):
     with task._update():
         task.started_at = datetime.utcnow()
         task.status = task.STARTED
-
[email protected]_task_signal.connect
-def _node_task_started(task, *args, **kwargs):
-    with task._update():
         _update_node_state_if_necessary(task, is_transitional=True)
 
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95255a79/aria/orchestrator/workflows/core/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/task.py 
b/aria/orchestrator/workflows/core/task.py
index 181e47b..05e3365 100644
--- a/aria/orchestrator/workflows/core/task.py
+++ b/aria/orchestrator/workflows/core/task.py
@@ -28,7 +28,7 @@ from functools import (
 from ....modeling import models
 from ...context import operation as operation_context
 from .. import exceptions
-from ..executor import dry
+from ..executor import base
 
 
 def _locked(func=None):
@@ -70,11 +70,9 @@ class StubTask(BaseTask):
     or sub-workflow
     """
     def __init__(self, *args, **kwargs):
-        super(StubTask, self).__init__(executor=dry.MarkerExecutor(), *args, 
**kwargs)
+        super(StubTask, self).__init__(executor=base.StubExecutor(), *args, 
**kwargs)
         self.status = models.Task.PENDING
         self.due_at = datetime.utcnow()
-        self.started_at = None
-        self.ended_at = None
 
     def has_ended(self):
         return self.status in (models.Task.SUCCESS, models.Task.FAILED)
@@ -83,11 +81,9 @@ class StubTask(BaseTask):
         return self.status in (models.Task.PENDING, models.Task.RETRYING)
 
     def end(self):
-        self.ended_at = datetime.utcnow()
         self.status = models.Task.SUCCESS
 
     def start(self):
-        self.started_at = datetime.utcnow()
         self.status = models.Task.STARTED
 
 
@@ -127,7 +123,7 @@ class OperationTask(BaseTask):
         # If no executor is provided, we defer that this is a stub task which 
does not need to be
         # executed.
         super(OperationTask, self).__init__(
-            id=api_task.id, executor=executor or dry.StubExecutor(), *args, 
**kwargs)
+            id=api_task.id, executor=executor or 
base.EmptyOperationExecutor(), *args, **kwargs)
 
         self._workflow_context = api_task._workflow_context
         self.interface_name = api_task.interface_name
@@ -146,12 +142,6 @@ class OperationTask(BaseTask):
             raise RuntimeError('No operation context could be created for 
{actor.model_cls}'
                                .format(actor=api_task.actor))
 
-        # TODO: this executor should be put into the task (if no executor was 
setup in the
-        # operation)
-        # executor = '{module}.{name}'.format(module=self._executor.__module__,
-        #                                     
name=self._executor.__class__.__name__
-        #                                    )
-
         task_model = create_task_model(
             name=api_task.name,
             actor=api_task.actor,

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95255a79/aria/orchestrator/workflows/executor/base.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/base.py 
b/aria/orchestrator/workflows/executor/base.py
index f11a6b7..3a3a1fe 100644
--- a/aria/orchestrator/workflows/executor/base.py
+++ b/aria/orchestrator/workflows/executor/base.py
@@ -54,3 +54,15 @@ class BaseExecutor(logger.LoggerMixin):
     @staticmethod
     def _task_succeeded(task):
         events.on_success_task_signal.send(task)
+
+
+class StubExecutor(BaseExecutor):
+    def execute(self, task):
+        task.start()
+        task.end()
+
+
+class EmptyOperationExecutor(BaseExecutor):
+    def execute(self, task):
+        events.start_task_signal.send(task)
+        events.on_success_task_signal.send(task)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95255a79/aria/orchestrator/workflows/executor/dry.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/dry.py 
b/aria/orchestrator/workflows/executor/dry.py
index 55d8f98..7da48f3 100644
--- a/aria/orchestrator/workflows/executor/dry.py
+++ b/aria/orchestrator/workflows/executor/dry.py
@@ -21,20 +21,6 @@ from aria.orchestrator import events
 from .base import BaseExecutor
 
 
-# TODO: the name of this module should definitely change
-
-class MarkerExecutor(BaseExecutor):
-    def execute(self, task):
-        task.start()
-        task.end()
-
-
-class StubExecutor(BaseExecutor):
-    def execute(self, task):
-        events.start_task_signal.send(task)
-        events.on_success_task_signal.send(task)
-
-
 class DryExecutor(BaseExecutor):
     """
     Executor which dry runs tasks - prints task information without causing 
any side effects

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95255a79/extensions/aria_extension_tosca/profiles/tosca-simple-1.0/interfaces.yaml
----------------------------------------------------------------------
diff --git 
a/extensions/aria_extension_tosca/profiles/tosca-simple-1.0/interfaces.yaml 
b/extensions/aria_extension_tosca/profiles/tosca-simple-1.0/interfaces.yaml
index ff6ba6c..a3021f9 100644
--- a/extensions/aria_extension_tosca/profiles/tosca-simple-1.0/interfaces.yaml
+++ b/extensions/aria_extension_tosca/profiles/tosca-simple-1.0/interfaces.yaml
@@ -82,14 +82,14 @@ interface_types:
         relationship_edge: target
     add_target:
       description: >-
-        Operation to notify the source node of a target node being added via a 
relationship.
+        Operation to notify the target node of a target node being added via a 
relationship.
       _extensions:
-        relationship_edge: source
+        relationship_edge: target
     add_source:
       description: >-
-        Operation to notify the target node of a source node which is now 
available via a relationship.
+        Operation to notify the source node of a source node which is now 
available via a relationship.
       _extensions:
-        relationship_edge: target
+        relationship_edge: source
     target_changed:
       description: >-
         Operation to notify source some property or attribute of the target 
changed
@@ -99,4 +99,9 @@ interface_types:
       description: >-
         Operation to remove a target node.
       _extensions:
+        relationship_edge: target
+    remove_source:
+      description: >-
+        Operation to remove a source node.
+      _extensions:
         relationship_edge: source

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/95255a79/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
----------------------------------------------------------------------
diff --git 
a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py 
b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
index c39f453..5dc1206 100644
--- a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
+++ b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
@@ -17,7 +17,7 @@ from networkx import topological_sort, DiGraph
 
 from aria.orchestrator import context
 from aria.orchestrator.workflows import api, core
-from aria.orchestrator.workflows.executor import dry
+from aria.orchestrator.workflows.executor import base
 
 from tests import mock
 from tests import storage
@@ -68,7 +68,7 @@ def test_task_graph_into_execution_graph(tmpdir):
     execution_graph = DiGraph()
     core.translation.build_execution_graph(task_graph=test_task_graph,
                                            execution_graph=execution_graph,
-                                           executor=dry.MarkerExecutor())
+                                           executor=base.StubExecutor())
     execution_tasks = topological_sort(execution_graph)
 
     assert len(execution_tasks) == 7

Reply via email to