Repository: incubator-ariatosca Updated Branches: refs/heads/operation_graph ee1a69170 -> 8d44901fc (forced update)
Add transcription mechanism from task_graph into execution_graph Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/8d44901f Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/8d44901f Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/8d44901f Branch: refs/heads/operation_graph Commit: 8d44901fcb5f6a9405ac4167c6b2aea681c9aef0 Parents: b53aa1f Author: mxmrlv <mxm...@gmail.com> Authored: Thu Oct 13 19:52:35 2016 +0300 Committer: mxmrlv <mxm...@gmail.com> Committed: Tue Oct 18 11:33:38 2016 +0300 ---------------------------------------------------------------------- aria/workflows/engine/engine.py | 98 ++++++++++++++------ aria/workflows/engine/tasks.py | 72 ++++++++++++++ tests/engine/__init__.py | 38 ++++++++ .../test_task_graph_into_exececution_graph.py | 85 +++++++++++++++++ tests/requirements.txt | 1 + 5 files changed, 267 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8d44901f/aria/workflows/engine/engine.py ---------------------------------------------------------------------- diff --git a/aria/workflows/engine/engine.py b/aria/workflows/engine/engine.py index 7b86fb5..4229fe9 100644 --- a/aria/workflows/engine/engine.py +++ b/aria/workflows/engine/engine.py @@ -28,6 +28,16 @@ from aria.events import ( on_failure_task_signal, ) from aria.logger import LoggerMixin +from aria.contexts import OperationContext + + +from .tasks import ( + StartWorkflowTask, + EndWorkflowTask, + StartSubWorkflowTask, + EndSubWorkflowTask, + OperationTask, +) class Engine(LoggerMixin): @@ -38,10 +48,67 @@ class Engine(LoggerMixin): self._tasks_graph = tasks_graph self._execution_graph = DiGraph() self._executor = executor - self._build_execution_graph(self._workflow_context, self._tasks_graph) - - def _build_execution_graph(self, workflow_context, graph): - pass + self._start_task_name = 'Starting.{0}'.format + self._end_task_name = 'Ending.{0}'.format + self._build_execution_graph(self._tasks_graph) + + def _build_execution_graph( + self, + graph, + StartClass=StartWorkflowTask, + EndClass=EndWorkflowTask, + depends_on=()): + + # Insert start marker + start_task = StartClass(name=self._start_task_name(graph.name), context=self._workflow_context) + self._add_task_and_dependencies(start_task, depends_on) + + for operation_or_workflow, dependencies in graph.task_tree(reverse=True): + operation_dependencies = self._get_tasks_from_dependencies(dependencies, default=[start_task]) + + if self._is_operation(operation_or_workflow): + # Add the task an the dependencies + operation_task = OperationTask( + name=operation_or_workflow.name, + context=operation_or_workflow, + **operation_or_workflow.engine_options + ) + self._add_task_and_dependencies(operation_task, operation_dependencies) + else: + # Built the graph recursively while adding start and end markers + self._build_execution_graph( + graph=operation_or_workflow, + StartClass=StartSubWorkflowTask, + EndClass=EndSubWorkflowTask, + depends_on=operation_dependencies + ) + + # Insert end marker + workflow_dependencies = self._get_tasks_from_dependencies(graph.leaf_tasks, default=[start_task]) + end_task = EndClass(name=self._end_task_name(graph.name), context=self._workflow_context) + self._add_task_and_dependencies(end_task, workflow_dependencies) + + def _add_task_and_dependencies(self, operation_task, operation_dependencies=()): + self._execution_graph.add_node(operation_task.name, task=operation_task) + for dependency in operation_dependencies: + self._execution_graph.add_edge(dependency.name, operation_task.name) + + def _get_tasks_from_dependencies(self, dependencies, default=()): + """ + Returns task list from dependencies. + """ + tasks = [self._execution_graph.node[ + dependency.name + if self._is_operation(dependency) else + # The task is an entire subgraph, so we need to check that the wrapper task has been reached + self._end_task_name(dependency.name)]['task'] + for dependency in dependencies] + + return tasks or default + + @staticmethod + def _is_operation(task): + return isinstance(task, OperationContext) def execute(self): execution_id = self._workflow_context.execution_id @@ -160,26 +227,3 @@ class Engine(LoggerMixin): start_task_signal.disconnect(self._task_started_receiver) on_success_task_signal.disconnect(self._task_succeeded_receiver) on_failure_task_signal.disconnect(self._task_failed_receiver) - - -class Task(object): - - def __init__(self, operation_context): - self.operation_context = operation_context - self._create_operation_in_storage() - - def _create_operation_in_storage(self): - Operation = self.operation_context.storage.operation.model_cls - operation = Operation( - id=self.operation_context.id, - execution_id=self.operation_context.execution_id, - max_retries=self.operation_context.parameters.get('max_retries', 1), - status=Operation.PENDING, - ) - self.operation_context.operation = operation - - def __getattr__(self, attr): - try: - return getattr(self.operation_context, attr) - except AttributeError: - return super(Task, self).__getattribute__(attr) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8d44901f/aria/workflows/engine/tasks.py ---------------------------------------------------------------------- diff --git a/aria/workflows/engine/tasks.py b/aria/workflows/engine/tasks.py new file mode 100644 index 0000000..1403ce5 --- /dev/null +++ b/aria/workflows/engine/tasks.py @@ -0,0 +1,72 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class BaseTask(object): + + def __init__(self, name, context): + self.name = name + self.context = context + + +class MarkerTask(BaseTask): + pass + + +class WorkflowMarkerTask(MarkerTask): + pass + + +class StartWorkflowTask(MarkerTask): + pass + + +class EndWorkflowTask(MarkerTask): + pass + + +class SubWorkflowMarkerTask(MarkerTask): + pass + + +class StartSubWorkflowTask(SubWorkflowMarkerTask): + pass + + +class EndSubWorkflowTask(SubWorkflowMarkerTask): + pass + + +class OperationTask(BaseTask): + def __init__(self, *args, **kwargs): + super(OperationTask, self).__init__(*args, **kwargs) + self._create_operation_in_storage() + + def _create_operation_in_storage(self): + Operation = self.operation_context.storage.operation.model_cls + operation = Operation( + id=self.operation_context.id, + execution_id=self.operation_context.execution_id, + max_retries=self.operation_context.parameters.get('max_retries', 1), + status=Operation.PENDING, + ) + self.operation_context.operation = operation + + def __getattr__(self, attr): + try: + return getattr(self.operation_context, attr) + except AttributeError: + return super(OperationTask, self).__getattribute__(attr) + http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8d44901f/tests/engine/__init__.py ---------------------------------------------------------------------- diff --git a/tests/engine/__init__.py b/tests/engine/__init__.py new file mode 100644 index 0000000..e7533fd --- /dev/null +++ b/tests/engine/__init__.py @@ -0,0 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from uuid import uuid4 + +from aria.workflows.engine.executor import Executor +from aria.workflows.engine.tasks import OperationTask +from aria.contexts import OperationContext + + +class MockExecutor(Executor): + def execute(self, task): + print task + + +class MockOperationContext(OperationContext): + def __init__(self, id=str(uuid4()), **kwargs): + self.id = self.name = id + self.engine_options = {} + + def __repr__(self): + return self.id + + +class MockOperationTask(OperationTask): + def _create_operation_in_storage(self): + pass http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8d44901f/tests/engine/test_task_graph_into_exececution_graph.py ---------------------------------------------------------------------- diff --git a/tests/engine/test_task_graph_into_exececution_graph.py b/tests/engine/test_task_graph_into_exececution_graph.py new file mode 100644 index 0000000..b087a0f --- /dev/null +++ b/tests/engine/test_task_graph_into_exececution_graph.py @@ -0,0 +1,85 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +import mock +from networkx import topological_sort + +from aria.workflows.engine.engine import Engine +from aria.workflows.api.tasks_graph import TaskGraph +from aria.workflows.engine.tasks import (StartWorkflowTask, + EndWorkflowTask, + StartSubWorkflowTask, + EndSubWorkflowTask, + OperationTask) + +from . import MockExecutor, MockOperationContext, MockOperationTask + + +@mock.patch('aria.workflows.engine.engine.OperationTask', new=MockOperationTask) +def test_task_graph_into_execution_graph(): + task_graph_name = 'test_task_graph' + inner_task_graph_name = 'test_inner_task_graph' + simple_before_task_name = 'test_simple_before_task' + simple_after_task_name = 'test_simple_after_task' + inner_task_name = 'test_inner_task' + + task_graph = TaskGraph(task_graph_name) + simple_before_task = MockOperationContext(simple_before_task_name) + simple_after_task = MockOperationContext(simple_after_task_name) + + inner_task_graph = TaskGraph(inner_task_graph_name) + inner_task = MockOperationContext(inner_task_name) + inner_task_graph.add_task(inner_task) + + task_graph.add_task(simple_before_task) + task_graph.add_task(simple_after_task) + task_graph.add_task(inner_task_graph) + task_graph.dependency(inner_task_graph, [simple_before_task]) + task_graph.dependency(simple_after_task, [inner_task_graph]) + + engine = Engine(executor=MockExecutor(), + workflow_context=mock.MagicMock(), + tasks_graph=task_graph) + + # Direct check + execution_graph = engine._execution_graph + execution_tasks = topological_sort(execution_graph) + + assert len(execution_tasks) == 7 + + tasks_names = [ + 'Starting.{0}'.format(task_graph_name), + simple_before_task_name, + 'Starting.{0}'.format(inner_task_graph_name), + inner_task_name, + 'Ending.{0}'.format(inner_task_graph_name), + simple_after_task_name, + 'Ending.{0}'.format(task_graph_name) + ] + + assert tasks_names == execution_tasks + + assert isinstance(_get_task_by_name(execution_tasks[0], execution_graph), StartWorkflowTask) + assert simple_before_task == _get_task_by_name(execution_tasks[1], execution_graph).context + assert isinstance(_get_task_by_name(execution_tasks[2], execution_graph), StartSubWorkflowTask) + assert inner_task == _get_task_by_name(execution_tasks[3], execution_graph).context + assert isinstance(_get_task_by_name(execution_tasks[4], execution_graph), EndSubWorkflowTask) + assert simple_after_task == _get_task_by_name(execution_tasks[5], execution_graph).context + assert isinstance(_get_task_by_name(execution_tasks[6], execution_graph), EndWorkflowTask) + + +def _get_task_by_name(task_name, graph): + return graph.node[task_name]['task'] http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8d44901f/tests/requirements.txt ---------------------------------------------------------------------- diff --git a/tests/requirements.txt b/tests/requirements.txt index 07d82a6..f3443d1 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -4,3 +4,4 @@ tox==1.6.1 pylint==1.5.5 pytest==3.0.2 pytest-cov==2.3.1 +pytest-mock==1.2