Repository: incubator-ariatosca
Updated Branches:
  refs/heads/operation_graph b53aa1f3b -> 8f83ce939


wip


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/8f83ce93
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/8f83ce93
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/8f83ce93

Branch: refs/heads/operation_graph
Commit: 8f83ce9393ca5d57c719986ef78efc36031b1116
Parents: b53aa1f
Author: mxmrlv <mxm...@gmail.com>
Authored: Thu Oct 13 19:52:35 2016 +0300
Committer: mxmrlv <mxm...@gmail.com>
Committed: Thu Oct 13 19:52:35 2016 +0300

----------------------------------------------------------------------
 aria/workflows/engine/engine.py | 99 ++++++++++++++++++++++++++----------
 aria/workflows/engine/tasks.py  | 72 ++++++++++++++++++++++++++
 2 files changed, 144 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8f83ce93/aria/workflows/engine/engine.py
----------------------------------------------------------------------
diff --git a/aria/workflows/engine/engine.py b/aria/workflows/engine/engine.py
index 7b86fb5..34aee58 100644
--- a/aria/workflows/engine/engine.py
+++ b/aria/workflows/engine/engine.py
@@ -28,6 +28,16 @@ from aria.events import (
     on_failure_task_signal,
 )
 from aria.logger import LoggerMixin
+from aria.contexts import OperationContext
+
+
+from .tasks import (
+    StartWorkflowTask,
+    EndWorkflowTask,
+    StartSubWorkflowTask,
+    EndSubWorkflowTask,
+    OperationTask,
+)
 
 
 class Engine(LoggerMixin):
@@ -38,10 +48,68 @@ class Engine(LoggerMixin):
         self._tasks_graph = tasks_graph
         self._execution_graph = DiGraph()
         self._executor = executor
-        self._build_execution_graph(self._workflow_context, self._tasks_graph)
-
-    def _build_execution_graph(self, workflow_context, graph):
-        pass
+        self._start_task_name = 'Starting.{0}'.format
+        self._end_task_name = 'Ending.{0}'.format
+        self._build_execution_graph(self._tasks_graph)
+
+    def _build_execution_graph(
+            self,
+            graph,
+            StartClass=StartWorkflowTask,
+            EndClass=EndWorkflowTask,
+            depends_on=()):
+
+        # Insert start marker
+        start_task = StartClass(task_name=self._start_task_name(graph.name), 
context=self._workflow_context)
+        self._add_task_and_dependencies(start_task, depends_on)
+
+        for operation_or_workflow, dependencies in 
graph.task_tree(reverse=True):
+            operation_dependencies = 
self._get_tasks_from_dependencies(dependencies, default=[start_task])
+
+            if self._is_operation(operation_or_workflow):
+                # Add the task an the dependencies
+                operation_task = OperationTask(
+                    task_name=operation_or_workflow.name,
+                    context=operation_or_workflow,
+                    **operation_or_workflow.engine_options
+                )
+                self._add_task_and_dependencies(operation_task, 
operation_dependencies)
+            else:
+                # Built the graph recursively while adding start and end 
markers
+                self._build_execution_graph(
+                    graph=operation_or_workflow,
+                    StartClass=StartSubWorkflowTask,
+                    EndClass=EndSubWorkflowTask,
+                    depends_on=operation_dependencies
+                )
+
+        # Insert end marker
+        workflow_dependencies = 
self._get_tasks_from_dependencies(graph.leaf_tasks, default=[start_task])
+        end_task = EndClass(task_name=self._end_task_name(graph.name), 
context=self._workflow_context)
+        self._add_task_and_dependencies(end_task, workflow_dependencies)
+
+    def _add_task_and_dependencies(self, operation_task, 
operation_dependencies=()):
+        self._execution_graph.add_node(operation_task)
+        for dependency in operation_dependencies:
+            
self._execution_graph.add_edge(self._execution_graph[dependency.task_name],
+                                           
self._execution_graph[operation_task.task_name])
+
+    def _get_tasks_from_dependencies(self, dependencies, default=()):
+        """
+        Returns task list from dependencies.
+        """
+        return [
+            self._workflow_tasks[
+                context.name
+                if self._is_operation(context) else
+                self._end_task_name(context.name)
+            ]
+            for context in dependencies
+        ] or default
+
+    @staticmethod
+    def _is_operation(task):
+        return isinstance(task, OperationContext)
 
     def execute(self):
         execution_id = self._workflow_context.execution_id
@@ -160,26 +228,3 @@ class Engine(LoggerMixin):
             start_task_signal.disconnect(self._task_started_receiver)
             on_success_task_signal.disconnect(self._task_succeeded_receiver)
             on_failure_task_signal.disconnect(self._task_failed_receiver)
-
-
-class Task(object):
-
-    def __init__(self, operation_context):
-        self.operation_context = operation_context
-        self._create_operation_in_storage()
-
-    def _create_operation_in_storage(self):
-        Operation = self.operation_context.storage.operation.model_cls
-        operation = Operation(
-            id=self.operation_context.id,
-            execution_id=self.operation_context.execution_id,
-            max_retries=self.operation_context.parameters.get('max_retries', 
1),
-            status=Operation.PENDING,
-        )
-        self.operation_context.operation = operation
-
-    def __getattr__(self, attr):
-        try:
-            return getattr(self.operation_context, attr)
-        except AttributeError:
-            return super(Task, self).__getattribute__(attr)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8f83ce93/aria/workflows/engine/tasks.py
----------------------------------------------------------------------
diff --git a/aria/workflows/engine/tasks.py b/aria/workflows/engine/tasks.py
new file mode 100644
index 0000000..688b38b
--- /dev/null
+++ b/aria/workflows/engine/tasks.py
@@ -0,0 +1,72 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+class BaseTask(object):
+
+    def __init__(self, task_name, context, depends_on):
+        self.task_name = task_name
+        self.context = context
+
+
+class MarkerTask(BaseTask):
+    pass
+
+
+class WorkflowMarkerTask(MarkerTask):
+    pass
+
+
+class StartWorkflowTask(MarkerTask):
+    pass
+
+
+class EndWorkflowTask(MarkerTask):
+    pass
+
+
+class SubWorkflowMarkerTask(MarkerTask):
+    pass
+
+
+class StartSubWorkflowTask(SubWorkflowMarkerTask):
+    pass
+
+
+class EndSubWorkflowTask(SubWorkflowMarkerTask):
+    pass
+
+
+class OperationTask(BaseTask):
+    def __init__(self, *args, **kwargs):
+        super(OperationTask, self).__init__(*args, **kwargs)
+        self._create_operation_in_storage()
+
+    def _create_operation_in_storage(self):
+        Operation = self.operation_context.storage.operation.model_cls
+        operation = Operation(
+            id=self.operation_context.id,
+            execution_id=self.operation_context.execution_id,
+            max_retries=self.operation_context.parameters.get('max_retries', 
1),
+            status=Operation.PENDING,
+        )
+        self.operation_context.operation = operation
+
+    def __getattr__(self, attr):
+        try:
+            return getattr(self.operation_context, attr)
+        except AttributeError:
+            return super(OperationTask, self).__getattribute__(attr)
+

Reply via email to