Repository: incubator-ariatosca
Updated Branches:
  refs/heads/operation_graph 8d44901fc -> 4a053b2c3


code review fixups


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/4a053b2c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/4a053b2c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/4a053b2c

Branch: refs/heads/operation_graph
Commit: 4a053b2c328146df00d830eda65b3f83c4c3682e
Parents: 8d44901
Author: mxmrlv <mxm...@gmail.com>
Authored: Tue Oct 18 15:30:51 2016 +0300
Committer: mxmrlv <mxm...@gmail.com>
Committed: Tue Oct 18 15:48:23 2016 +0300

----------------------------------------------------------------------
 aria/events/__init__.py                         |  6 +-
 aria/workflows/engine/engine.py                 | 75 ++---------------
 aria/workflows/engine/tasks.py                  | 23 ++----
 aria/workflows/engine/transcription.py          | 83 +++++++++++++++++++
 tests/engine/__init__.py                        | 38 ---------
 .../test_task_graph_into_exececution_graph.py   | 85 --------------------
 tests/workflows/__init__.py                     | 14 ++++
 .../test_task_graph_into_exececution_graph.py   | 73 +++++++++++++++++
 8 files changed, 185 insertions(+), 212 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4a053b2c/aria/events/__init__.py
----------------------------------------------------------------------
diff --git a/aria/events/__init__.py b/aria/events/__init__.py
index 70e7e03..b815c7a 100644
--- a/aria/events/__init__.py
+++ b/aria/events/__init__.py
@@ -20,13 +20,13 @@ from blinker import signal
 from ..tools.plugin import plugin_installer
 
 
-# workflow engine default signals:
+# workflow workflows default signals:
 start_task_signal = signal('start_task_signal')
 end_task_signal = signal('end_task_signal')
 on_success_task_signal = signal('success_task_signal')
 on_failure_task_signal = signal('failure_task_signal')
 
-# workflow engine workflow signals:
+# workflow workflows workflow signals:
 start_workflow_signal = signal('start_workflow_signal')
 end_workflow_signal = signal('end_workflow_signal')
 on_success_workflow_signal = signal('on_success_workflow_signal')
@@ -34,7 +34,7 @@ on_failure_workflow_signal = 
signal('on_failure_workflow_signal')
 start_sub_workflow_signal = signal('start_sub_workflow_signal')
 end_sub_workflow_signal = signal('end_sub_workflow_signal')
 
-# workflow engine operation signals:
+# workflow workflows operation signals:
 start_operation_signal = signal('start_operation_signal')
 end_operation_signal = signal('end_operation_signal')
 on_success_operation_signal = signal('on_success_operation_signal')

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4a053b2c/aria/workflows/engine/engine.py
----------------------------------------------------------------------
diff --git a/aria/workflows/engine/engine.py b/aria/workflows/engine/engine.py
index 4229fe9..2a5afce 100644
--- a/aria/workflows/engine/engine.py
+++ b/aria/workflows/engine/engine.py
@@ -28,16 +28,11 @@ from aria.events import (
     on_failure_task_signal,
 )
 from aria.logger import LoggerMixin
-from aria.contexts import OperationContext
 
+from .transcription import build_execution_graph
 
-from .tasks import (
-    StartWorkflowTask,
-    EndWorkflowTask,
-    StartSubWorkflowTask,
-    EndSubWorkflowTask,
-    OperationTask,
-)
+
+from ...storage import Model
 
 
 class Engine(LoggerMixin):
@@ -48,67 +43,9 @@ class Engine(LoggerMixin):
         self._tasks_graph = tasks_graph
         self._execution_graph = DiGraph()
         self._executor = executor
-        self._start_task_name = 'Starting.{0}'.format
-        self._end_task_name = 'Ending.{0}'.format
-        self._build_execution_graph(self._tasks_graph)
-
-    def _build_execution_graph(
-            self,
-            graph,
-            StartClass=StartWorkflowTask,
-            EndClass=EndWorkflowTask,
-            depends_on=()):
-
-        # Insert start marker
-        start_task = StartClass(name=self._start_task_name(graph.name), 
context=self._workflow_context)
-        self._add_task_and_dependencies(start_task, depends_on)
-
-        for operation_or_workflow, dependencies in 
graph.task_tree(reverse=True):
-            operation_dependencies = 
self._get_tasks_from_dependencies(dependencies, default=[start_task])
-
-            if self._is_operation(operation_or_workflow):
-                # Add the task an the dependencies
-                operation_task = OperationTask(
-                    name=operation_or_workflow.name,
-                    context=operation_or_workflow,
-                    **operation_or_workflow.engine_options
-                )
-                self._add_task_and_dependencies(operation_task, 
operation_dependencies)
-            else:
-                # Built the graph recursively while adding start and end 
markers
-                self._build_execution_graph(
-                    graph=operation_or_workflow,
-                    StartClass=StartSubWorkflowTask,
-                    EndClass=EndSubWorkflowTask,
-                    depends_on=operation_dependencies
-                )
-
-        # Insert end marker
-        workflow_dependencies = 
self._get_tasks_from_dependencies(graph.leaf_tasks, default=[start_task])
-        end_task = EndClass(name=self._end_task_name(graph.name), 
context=self._workflow_context)
-        self._add_task_and_dependencies(end_task, workflow_dependencies)
-
-    def _add_task_and_dependencies(self, operation_task, 
operation_dependencies=()):
-        self._execution_graph.add_node(operation_task.name, 
task=operation_task)
-        for dependency in operation_dependencies:
-            self._execution_graph.add_edge(dependency.name, 
operation_task.name)
-
-    def _get_tasks_from_dependencies(self, dependencies, default=()):
-        """
-        Returns task list from dependencies.
-        """
-        tasks = [self._execution_graph.node[
-                    dependency.name
-                    if self._is_operation(dependency) else
-                    # The task is an entire subgraph, so we need to check that 
the wrapper task has been reached
-                    self._end_task_name(dependency.name)]['task']
-                 for dependency in dependencies]
-
-        return tasks or default
-
-    @staticmethod
-    def _is_operation(task):
-        return isinstance(task, OperationContext)
+        build_execution_graph(task_graph=self._tasks_graph,
+                              workflow_context=workflow_context,
+                              execution_graph=self._execution_graph)
 
     def execute(self):
         execution_id = self._workflow_context.execution_id

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4a053b2c/aria/workflows/engine/tasks.py
----------------------------------------------------------------------
diff --git a/aria/workflows/engine/tasks.py b/aria/workflows/engine/tasks.py
index 1403ce5..e120bdd 100644
--- a/aria/workflows/engine/tasks.py
+++ b/aria/workflows/engine/tasks.py
@@ -16,36 +16,25 @@
 
 class BaseTask(object):
 
-    def __init__(self, name, context):
+    def __init__(self, id, name, context):
+        self.id = id
         self.name = name
         self.context = context
 
 
-class MarkerTask(BaseTask):
+class StartWorkflowTask(BaseTask):
     pass
 
 
-class WorkflowMarkerTask(MarkerTask):
+class EndWorkflowTask(BaseTask):
     pass
 
 
-class StartWorkflowTask(MarkerTask):
+class StartSubWorkflowTask(BaseTask):
     pass
 
 
-class EndWorkflowTask(MarkerTask):
-    pass
-
-
-class SubWorkflowMarkerTask(MarkerTask):
-    pass
-
-
-class StartSubWorkflowTask(SubWorkflowMarkerTask):
-    pass
-
-
-class EndSubWorkflowTask(SubWorkflowMarkerTask):
+class EndSubWorkflowTask(BaseTask):
     pass
 
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4a053b2c/aria/workflows/engine/transcription.py
----------------------------------------------------------------------
diff --git a/aria/workflows/engine/transcription.py 
b/aria/workflows/engine/transcription.py
new file mode 100644
index 0000000..fc6138b
--- /dev/null
+++ b/aria/workflows/engine/transcription.py
@@ -0,0 +1,83 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from aria import contexts
+
+from . import tasks
+
+
+def build_execution_graph(
+        task_graph,
+        workflow_context,
+        execution_graph,
+        start_cls=tasks.StartWorkflowTask,
+        end_cls=tasks.EndWorkflowTask,
+        depends_on=()):
+    # Insert start marker
+    start_task = start_cls(id=_start_graph_id(task_graph.id),
+                           name=_start_graph_id(task_graph.name),
+                           context=workflow_context)
+    _add_task_and_dependencies(execution_graph, start_task, depends_on)
+
+    for operation_or_workflow, dependencies in 
task_graph.task_tree(reverse=True):
+        operation_dependencies = _get_tasks_from_dependencies(execution_graph, 
dependencies, default=[start_task])
+
+        if _is_operation(operation_or_workflow):
+            # Add the task an the dependencies
+            operation_task = tasks.OperationTask(id=operation_or_workflow.id,
+                                                 
name=operation_or_workflow.name,
+                                                 context=operation_or_workflow)
+            _add_task_and_dependencies(execution_graph, operation_task, 
operation_dependencies)
+        else:
+            # Built the graph recursively while adding start and end markers
+            build_execution_graph(
+                task_graph=operation_or_workflow,
+                workflow_context=workflow_context,
+                execution_graph=execution_graph,
+                start_cls=tasks.StartSubWorkflowTask,
+                end_cls=tasks.EndSubWorkflowTask,
+                depends_on=operation_dependencies
+            )
+
+    # Insert end marker
+    workflow_dependencies = _get_tasks_from_dependencies(execution_graph, 
task_graph.leaf_tasks, default=[start_task])
+    end_task = end_cls(id=_end_graph_id(task_graph.id), 
name=_end_graph_id(task_graph.name), context=workflow_context)
+    _add_task_and_dependencies(execution_graph, end_task, 
workflow_dependencies)
+
+
+def _add_task_and_dependencies(execution_graph, operation_task, 
operation_dependencies=()):
+    execution_graph.add_node(operation_task.id, task=operation_task)
+    for dependency in operation_dependencies:
+        execution_graph.add_edge(dependency.id, operation_task.id)
+
+
+def _get_tasks_from_dependencies(execution_graph, dependencies, default=()):
+    """
+    Returns task list from dependencies.
+    """
+    return [execution_graph.node[dependency.id if _is_operation(dependency) 
else _end_graph_id(dependency.id)]
+            ['task'] for dependency in dependencies] or default
+
+
+def _is_operation(task):
+    return isinstance(task, contexts.OperationContext)
+
+
+def _start_graph_id(id):
+    return '{0}-Start'.format(id)
+
+
+def _end_graph_id(id):
+    return '{0}-End'.format(id)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4a053b2c/tests/engine/__init__.py
----------------------------------------------------------------------
diff --git a/tests/engine/__init__.py b/tests/engine/__init__.py
deleted file mode 100644
index e7533fd..0000000
--- a/tests/engine/__init__.py
+++ /dev/null
@@ -1,38 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-from uuid import uuid4
-
-from aria.workflows.engine.executor import Executor
-from aria.workflows.engine.tasks import OperationTask
-from aria.contexts import OperationContext
-
-
-class MockExecutor(Executor):
-    def execute(self, task):
-        print task
-
-
-class MockOperationContext(OperationContext):
-    def __init__(self, id=str(uuid4()), **kwargs):
-        self.id = self.name = id
-        self.engine_options = {}
-
-    def __repr__(self):
-        return self.id
-
-
-class MockOperationTask(OperationTask):
-    def _create_operation_in_storage(self):
-        pass

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4a053b2c/tests/engine/test_task_graph_into_exececution_graph.py
----------------------------------------------------------------------
diff --git a/tests/engine/test_task_graph_into_exececution_graph.py 
b/tests/engine/test_task_graph_into_exececution_graph.py
deleted file mode 100644
index b087a0f..0000000
--- a/tests/engine/test_task_graph_into_exececution_graph.py
+++ /dev/null
@@ -1,85 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import pytest
-import mock
-from networkx import topological_sort
-
-from aria.workflows.engine.engine import Engine
-from aria.workflows.api.tasks_graph import TaskGraph
-from aria.workflows.engine.tasks import (StartWorkflowTask,
-                                         EndWorkflowTask,
-                                         StartSubWorkflowTask,
-                                         EndSubWorkflowTask,
-                                         OperationTask)
-
-from . import MockExecutor, MockOperationContext, MockOperationTask
-
-
-@mock.patch('aria.workflows.engine.engine.OperationTask', 
new=MockOperationTask)
-def test_task_graph_into_execution_graph():
-    task_graph_name = 'test_task_graph'
-    inner_task_graph_name = 'test_inner_task_graph'
-    simple_before_task_name = 'test_simple_before_task'
-    simple_after_task_name = 'test_simple_after_task'
-    inner_task_name = 'test_inner_task'
-
-    task_graph = TaskGraph(task_graph_name)
-    simple_before_task = MockOperationContext(simple_before_task_name)
-    simple_after_task = MockOperationContext(simple_after_task_name)
-
-    inner_task_graph = TaskGraph(inner_task_graph_name)
-    inner_task = MockOperationContext(inner_task_name)
-    inner_task_graph.add_task(inner_task)
-
-    task_graph.add_task(simple_before_task)
-    task_graph.add_task(simple_after_task)
-    task_graph.add_task(inner_task_graph)
-    task_graph.dependency(inner_task_graph, [simple_before_task])
-    task_graph.dependency(simple_after_task, [inner_task_graph])
-
-    engine = Engine(executor=MockExecutor(),
-                    workflow_context=mock.MagicMock(),
-                    tasks_graph=task_graph)
-
-    # Direct check
-    execution_graph = engine._execution_graph
-    execution_tasks = topological_sort(execution_graph)
-
-    assert len(execution_tasks) == 7
-
-    tasks_names = [
-        'Starting.{0}'.format(task_graph_name),
-        simple_before_task_name,
-        'Starting.{0}'.format(inner_task_graph_name),
-        inner_task_name,
-        'Ending.{0}'.format(inner_task_graph_name),
-        simple_after_task_name,
-        'Ending.{0}'.format(task_graph_name)
-    ]
-
-    assert tasks_names == execution_tasks
-
-    assert isinstance(_get_task_by_name(execution_tasks[0], execution_graph), 
StartWorkflowTask)
-    assert simple_before_task == _get_task_by_name(execution_tasks[1], 
execution_graph).context
-    assert isinstance(_get_task_by_name(execution_tasks[2], execution_graph), 
StartSubWorkflowTask)
-    assert inner_task == _get_task_by_name(execution_tasks[3], 
execution_graph).context
-    assert isinstance(_get_task_by_name(execution_tasks[4], execution_graph), 
EndSubWorkflowTask)
-    assert simple_after_task == _get_task_by_name(execution_tasks[5], 
execution_graph).context
-    assert isinstance(_get_task_by_name(execution_tasks[6], execution_graph), 
EndWorkflowTask)
-
-
-def _get_task_by_name(task_name, graph):
-    return graph.node[task_name]['task']

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4a053b2c/tests/workflows/__init__.py
----------------------------------------------------------------------
diff --git a/tests/workflows/__init__.py b/tests/workflows/__init__.py
new file mode 100644
index 0000000..ae1e83e
--- /dev/null
+++ b/tests/workflows/__init__.py
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4a053b2c/tests/workflows/test_task_graph_into_exececution_graph.py
----------------------------------------------------------------------
diff --git a/tests/workflows/test_task_graph_into_exececution_graph.py 
b/tests/workflows/test_task_graph_into_exececution_graph.py
new file mode 100644
index 0000000..f7007ed
--- /dev/null
+++ b/tests/workflows/test_task_graph_into_exececution_graph.py
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+from networkx import topological_sort, DiGraph
+
+from aria import contexts
+from aria.workflows.api import tasks_graph
+from aria.workflows.engine import tasks, transcription
+
+
+@pytest.fixture(autouse=True)
+def no_storage(monkeypatch):
+    monkeypatch.setattr(tasks.OperationTask, '_create_operation_in_storage', 
value=lambda *args, **kwargs: None)
+
+
+def test_task_graph_into_execution_graph():
+    task_graph = tasks_graph.TaskGraph('test_task_graph')
+    simple_before_task = contexts.OperationContext('test_simple_before_task', 
{}, {}, None)
+    simple_after_task = contexts.OperationContext('test_simple_after_task', 
{}, {}, None)
+
+    inner_task_graph = tasks_graph.TaskGraph('test_inner_task_graph')
+    inner_task = contexts.OperationContext('test_inner_task', {}, {}, None)
+    inner_task_graph.add_task(inner_task)
+
+    task_graph.add_task(simple_before_task)
+    task_graph.add_task(simple_after_task)
+    task_graph.add_task(inner_task_graph)
+    task_graph.dependency(inner_task_graph, [simple_before_task])
+    task_graph.dependency(simple_after_task, [inner_task_graph])
+
+    # Direct check
+    execution_graph = DiGraph()
+    transcription.build_execution_graph(task_graph=task_graph, 
workflow_context=None, execution_graph=execution_graph)
+    execution_tasks = topological_sort(execution_graph)
+
+    assert len(execution_tasks) == 7
+
+    expected_tasks_names = [
+        '{0}-Start'.format(task_graph.id),
+        simple_before_task.id,
+        '{0}-Start'.format(inner_task_graph.id),
+        inner_task.id,
+        '{0}-End'.format(inner_task_graph.id),
+        simple_after_task.id,
+        '{0}-End'.format(task_graph.id)
+    ]
+
+    assert expected_tasks_names == execution_tasks
+
+    assert isinstance(_get_task_by_name(execution_tasks[0], execution_graph), 
tasks.StartWorkflowTask)
+    assert simple_before_task == _get_task_by_name(execution_tasks[1], 
execution_graph).context
+    assert isinstance(_get_task_by_name(execution_tasks[2], execution_graph), 
tasks.StartSubWorkflowTask)
+    assert inner_task == _get_task_by_name(execution_tasks[3], 
execution_graph).context
+    assert isinstance(_get_task_by_name(execution_tasks[4], execution_graph), 
tasks.EndSubWorkflowTask)
+    assert simple_after_task == _get_task_by_name(execution_tasks[5], 
execution_graph).context
+    assert isinstance(_get_task_by_name(execution_tasks[6], execution_graph), 
tasks.EndWorkflowTask)
+
+
+def _get_task_by_name(task_name, graph):
+    return graph.node[task_name]['task']

Reply via email to