Repository: incubator-ariatosca
Updated Branches:
  refs/heads/operation_graph ee1a69170 -> 8d44901fc (forced update)


Add transcription mechanism from task_graph into execution_graph


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/8d44901f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/8d44901f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/8d44901f

Branch: refs/heads/operation_graph
Commit: 8d44901fcb5f6a9405ac4167c6b2aea681c9aef0
Parents: b53aa1f
Author: mxmrlv <mxm...@gmail.com>
Authored: Thu Oct 13 19:52:35 2016 +0300
Committer: mxmrlv <mxm...@gmail.com>
Committed: Tue Oct 18 11:33:38 2016 +0300

----------------------------------------------------------------------
 aria/workflows/engine/engine.py                 | 98 ++++++++++++++------
 aria/workflows/engine/tasks.py                  | 72 ++++++++++++++
 tests/engine/__init__.py                        | 38 ++++++++
 .../test_task_graph_into_exececution_graph.py   | 85 +++++++++++++++++
 tests/requirements.txt                          |  1 +
 5 files changed, 267 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8d44901f/aria/workflows/engine/engine.py
----------------------------------------------------------------------
diff --git a/aria/workflows/engine/engine.py b/aria/workflows/engine/engine.py
index 7b86fb5..4229fe9 100644
--- a/aria/workflows/engine/engine.py
+++ b/aria/workflows/engine/engine.py
@@ -28,6 +28,16 @@ from aria.events import (
     on_failure_task_signal,
 )
 from aria.logger import LoggerMixin
+from aria.contexts import OperationContext
+
+
+from .tasks import (
+    StartWorkflowTask,
+    EndWorkflowTask,
+    StartSubWorkflowTask,
+    EndSubWorkflowTask,
+    OperationTask,
+)
 
 
 class Engine(LoggerMixin):
@@ -38,10 +48,67 @@ class Engine(LoggerMixin):
         self._tasks_graph = tasks_graph
         self._execution_graph = DiGraph()
         self._executor = executor
-        self._build_execution_graph(self._workflow_context, self._tasks_graph)
-
-    def _build_execution_graph(self, workflow_context, graph):
-        pass
+        self._start_task_name = 'Starting.{0}'.format
+        self._end_task_name = 'Ending.{0}'.format
+        self._build_execution_graph(self._tasks_graph)
+
+    def _build_execution_graph(
+            self,
+            graph,
+            StartClass=StartWorkflowTask,
+            EndClass=EndWorkflowTask,
+            depends_on=()):
+
+        # Insert start marker
+        start_task = StartClass(name=self._start_task_name(graph.name), 
context=self._workflow_context)
+        self._add_task_and_dependencies(start_task, depends_on)
+
+        for operation_or_workflow, dependencies in 
graph.task_tree(reverse=True):
+            operation_dependencies = 
self._get_tasks_from_dependencies(dependencies, default=[start_task])
+
+            if self._is_operation(operation_or_workflow):
+                # Add the task an the dependencies
+                operation_task = OperationTask(
+                    name=operation_or_workflow.name,
+                    context=operation_or_workflow,
+                    **operation_or_workflow.engine_options
+                )
+                self._add_task_and_dependencies(operation_task, 
operation_dependencies)
+            else:
+                # Built the graph recursively while adding start and end 
markers
+                self._build_execution_graph(
+                    graph=operation_or_workflow,
+                    StartClass=StartSubWorkflowTask,
+                    EndClass=EndSubWorkflowTask,
+                    depends_on=operation_dependencies
+                )
+
+        # Insert end marker
+        workflow_dependencies = 
self._get_tasks_from_dependencies(graph.leaf_tasks, default=[start_task])
+        end_task = EndClass(name=self._end_task_name(graph.name), 
context=self._workflow_context)
+        self._add_task_and_dependencies(end_task, workflow_dependencies)
+
+    def _add_task_and_dependencies(self, operation_task, 
operation_dependencies=()):
+        self._execution_graph.add_node(operation_task.name, 
task=operation_task)
+        for dependency in operation_dependencies:
+            self._execution_graph.add_edge(dependency.name, 
operation_task.name)
+
+    def _get_tasks_from_dependencies(self, dependencies, default=()):
+        """
+        Returns task list from dependencies.
+        """
+        tasks = [self._execution_graph.node[
+                    dependency.name
+                    if self._is_operation(dependency) else
+                    # The task is an entire subgraph, so we need to check that 
the wrapper task has been reached
+                    self._end_task_name(dependency.name)]['task']
+                 for dependency in dependencies]
+
+        return tasks or default
+
+    @staticmethod
+    def _is_operation(task):
+        return isinstance(task, OperationContext)
 
     def execute(self):
         execution_id = self._workflow_context.execution_id
@@ -160,26 +227,3 @@ class Engine(LoggerMixin):
             start_task_signal.disconnect(self._task_started_receiver)
             on_success_task_signal.disconnect(self._task_succeeded_receiver)
             on_failure_task_signal.disconnect(self._task_failed_receiver)
-
-
-class Task(object):
-
-    def __init__(self, operation_context):
-        self.operation_context = operation_context
-        self._create_operation_in_storage()
-
-    def _create_operation_in_storage(self):
-        Operation = self.operation_context.storage.operation.model_cls
-        operation = Operation(
-            id=self.operation_context.id,
-            execution_id=self.operation_context.execution_id,
-            max_retries=self.operation_context.parameters.get('max_retries', 
1),
-            status=Operation.PENDING,
-        )
-        self.operation_context.operation = operation
-
-    def __getattr__(self, attr):
-        try:
-            return getattr(self.operation_context, attr)
-        except AttributeError:
-            return super(Task, self).__getattribute__(attr)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8d44901f/aria/workflows/engine/tasks.py
----------------------------------------------------------------------
diff --git a/aria/workflows/engine/tasks.py b/aria/workflows/engine/tasks.py
new file mode 100644
index 0000000..1403ce5
--- /dev/null
+++ b/aria/workflows/engine/tasks.py
@@ -0,0 +1,72 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+class BaseTask(object):
+
+    def __init__(self, name, context):
+        self.name = name
+        self.context = context
+
+
+class MarkerTask(BaseTask):
+    pass
+
+
+class WorkflowMarkerTask(MarkerTask):
+    pass
+
+
+class StartWorkflowTask(MarkerTask):
+    pass
+
+
+class EndWorkflowTask(MarkerTask):
+    pass
+
+
+class SubWorkflowMarkerTask(MarkerTask):
+    pass
+
+
+class StartSubWorkflowTask(SubWorkflowMarkerTask):
+    pass
+
+
+class EndSubWorkflowTask(SubWorkflowMarkerTask):
+    pass
+
+
+class OperationTask(BaseTask):
+    def __init__(self, *args, **kwargs):
+        super(OperationTask, self).__init__(*args, **kwargs)
+        self._create_operation_in_storage()
+
+    def _create_operation_in_storage(self):
+        Operation = self.operation_context.storage.operation.model_cls
+        operation = Operation(
+            id=self.operation_context.id,
+            execution_id=self.operation_context.execution_id,
+            max_retries=self.operation_context.parameters.get('max_retries', 
1),
+            status=Operation.PENDING,
+        )
+        self.operation_context.operation = operation
+
+    def __getattr__(self, attr):
+        try:
+            return getattr(self.operation_context, attr)
+        except AttributeError:
+            return super(OperationTask, self).__getattribute__(attr)
+

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8d44901f/tests/engine/__init__.py
----------------------------------------------------------------------
diff --git a/tests/engine/__init__.py b/tests/engine/__init__.py
new file mode 100644
index 0000000..e7533fd
--- /dev/null
+++ b/tests/engine/__init__.py
@@ -0,0 +1,38 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from uuid import uuid4
+
+from aria.workflows.engine.executor import Executor
+from aria.workflows.engine.tasks import OperationTask
+from aria.contexts import OperationContext
+
+
+class MockExecutor(Executor):
+    def execute(self, task):
+        print task
+
+
+class MockOperationContext(OperationContext):
+    def __init__(self, id=str(uuid4()), **kwargs):
+        self.id = self.name = id
+        self.engine_options = {}
+
+    def __repr__(self):
+        return self.id
+
+
+class MockOperationTask(OperationTask):
+    def _create_operation_in_storage(self):
+        pass

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8d44901f/tests/engine/test_task_graph_into_exececution_graph.py
----------------------------------------------------------------------
diff --git a/tests/engine/test_task_graph_into_exececution_graph.py 
b/tests/engine/test_task_graph_into_exececution_graph.py
new file mode 100644
index 0000000..b087a0f
--- /dev/null
+++ b/tests/engine/test_task_graph_into_exececution_graph.py
@@ -0,0 +1,85 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+import mock
+from networkx import topological_sort
+
+from aria.workflows.engine.engine import Engine
+from aria.workflows.api.tasks_graph import TaskGraph
+from aria.workflows.engine.tasks import (StartWorkflowTask,
+                                         EndWorkflowTask,
+                                         StartSubWorkflowTask,
+                                         EndSubWorkflowTask,
+                                         OperationTask)
+
+from . import MockExecutor, MockOperationContext, MockOperationTask
+
+
+@mock.patch('aria.workflows.engine.engine.OperationTask', 
new=MockOperationTask)
+def test_task_graph_into_execution_graph():
+    task_graph_name = 'test_task_graph'
+    inner_task_graph_name = 'test_inner_task_graph'
+    simple_before_task_name = 'test_simple_before_task'
+    simple_after_task_name = 'test_simple_after_task'
+    inner_task_name = 'test_inner_task'
+
+    task_graph = TaskGraph(task_graph_name)
+    simple_before_task = MockOperationContext(simple_before_task_name)
+    simple_after_task = MockOperationContext(simple_after_task_name)
+
+    inner_task_graph = TaskGraph(inner_task_graph_name)
+    inner_task = MockOperationContext(inner_task_name)
+    inner_task_graph.add_task(inner_task)
+
+    task_graph.add_task(simple_before_task)
+    task_graph.add_task(simple_after_task)
+    task_graph.add_task(inner_task_graph)
+    task_graph.dependency(inner_task_graph, [simple_before_task])
+    task_graph.dependency(simple_after_task, [inner_task_graph])
+
+    engine = Engine(executor=MockExecutor(),
+                    workflow_context=mock.MagicMock(),
+                    tasks_graph=task_graph)
+
+    # Direct check
+    execution_graph = engine._execution_graph
+    execution_tasks = topological_sort(execution_graph)
+
+    assert len(execution_tasks) == 7
+
+    tasks_names = [
+        'Starting.{0}'.format(task_graph_name),
+        simple_before_task_name,
+        'Starting.{0}'.format(inner_task_graph_name),
+        inner_task_name,
+        'Ending.{0}'.format(inner_task_graph_name),
+        simple_after_task_name,
+        'Ending.{0}'.format(task_graph_name)
+    ]
+
+    assert tasks_names == execution_tasks
+
+    assert isinstance(_get_task_by_name(execution_tasks[0], execution_graph), 
StartWorkflowTask)
+    assert simple_before_task == _get_task_by_name(execution_tasks[1], 
execution_graph).context
+    assert isinstance(_get_task_by_name(execution_tasks[2], execution_graph), 
StartSubWorkflowTask)
+    assert inner_task == _get_task_by_name(execution_tasks[3], 
execution_graph).context
+    assert isinstance(_get_task_by_name(execution_tasks[4], execution_graph), 
EndSubWorkflowTask)
+    assert simple_after_task == _get_task_by_name(execution_tasks[5], 
execution_graph).context
+    assert isinstance(_get_task_by_name(execution_tasks[6], execution_graph), 
EndWorkflowTask)
+
+
+def _get_task_by_name(task_name, graph):
+    return graph.node[task_name]['task']

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8d44901f/tests/requirements.txt
----------------------------------------------------------------------
diff --git a/tests/requirements.txt b/tests/requirements.txt
index 07d82a6..f3443d1 100644
--- a/tests/requirements.txt
+++ b/tests/requirements.txt
@@ -4,3 +4,4 @@ tox==1.6.1
 pylint==1.5.5
 pytest==3.0.2
 pytest-cov==2.3.1
+pytest-mock==1.2

Reply via email to