Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-284-Cleanup-and-optimize-the-task-execution ae2bab3da -> 660139ed7 (forced update)
review 1 Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/660139ed Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/660139ed Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/660139ed Branch: refs/heads/ARIA-284-Cleanup-and-optimize-the-task-execution Commit: 660139ed74405aafa7fa2f6fda6d9591789618aa Parents: 827230d Author: max-orlov <[email protected]> Authored: Sun Jun 25 14:01:56 2017 +0300 Committer: max-orlov <[email protected]> Committed: Sun Jun 25 14:06:42 2017 +0300 ---------------------------------------------------------------------- aria/orchestrator/workflow_runner.py | 4 +- aria/orchestrator/workflows/core/compile.py | 122 ------------------- aria/orchestrator/workflows/core/engine.py | 18 +-- .../workflows/core/graph_compiler.py | 120 ++++++++++++++++++ tests/orchestrator/context/__init__.py | 4 +- tests/orchestrator/context/test_serialize.py | 4 +- .../orchestrator/execution_plugin/test_local.py | 4 +- tests/orchestrator/execution_plugin/test_ssh.py | 5 +- .../orchestrator/workflows/core/test_engine.py | 4 +- .../orchestrator/workflows/core/test_events.py | 4 +- .../test_task_graph_into_execution_graph.py | 8 +- .../executor/test_process_executor_extension.py | 4 +- .../test_process_executor_tracked_changes.py | 4 +- 13 files changed, 152 insertions(+), 153 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/660139ed/aria/orchestrator/workflow_runner.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py index b3f100d..4a50fb2 100644 --- a/aria/orchestrator/workflow_runner.py +++ b/aria/orchestrator/workflow_runner.py @@ -24,7 +24,7 @@ from datetime import datetime from . import exceptions from .context.workflow import WorkflowContext from .workflows import builtin -from .workflows.core import engine, compile +from .workflows.core import engine, graph_compiler from .workflows.executor.process import ProcessExecutor from ..modeling import models from ..modeling import utils as modeling_utils @@ -97,7 +97,7 @@ class WorkflowRunner(object): if not self._is_resume: workflow_fn = self._get_workflow_fn() self._tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict) - compile.GraphCompiler(self._workflow_context, executor.__class__).compile( + graph_compiler.GraphCompiler(self._workflow_context, executor.__class__).compile( self._tasks_graph) self._engine = engine.Engine(executors={executor.__class__: executor}) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/660139ed/aria/orchestrator/workflows/core/compile.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/compile.py b/aria/orchestrator/workflows/core/compile.py deleted file mode 100644 index 83de22c..0000000 --- a/aria/orchestrator/workflows/core/compile.py +++ /dev/null @@ -1,122 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from ....modeling import models -from .. import executor, api - - -# TODO: is class really needed? - -class GraphCompiler(object): - def __init__(self, ctx, default_executor): - self._ctx = ctx - self._default_executor = default_executor - self._stub_executor = executor.base.StubTaskExecutor - self._model_to_api_id = {} - - def compile(self, - task_graph, - start_stub_type=models.Task.START_WORKFLOW, - end_stub_type=models.Task.END_WORKFLOW, - depends_on=()): - """ - Translates the user graph to the execution graph - :param task_graph: The user's graph - :param start_stub_type: internal use - :param end_stub_type: internal use - :param depends_on: internal use - """ - task_graph = task_graph or self._task_graph - depends_on = list(depends_on) - - # Insert start marker - start_task = self._create_stub_task( - start_stub_type, depends_on, self._start_graph_suffix(task_graph.id), task_graph.name, - ) - - for task in task_graph.topological_order(reverse=True): - dependencies = \ - (self._get_tasks_from_dependencies(task_graph.get_dependencies(task)) - or [start_task]) - - if isinstance(task, api.task.OperationTask): - self._create_operation_task(task, dependencies) - - elif isinstance(task, api.task.WorkflowTask): - # Build the graph recursively while adding start and end markers - self.compile( - task, models.Task.START_SUBWROFKLOW, models.Task.END_SUBWORKFLOW, dependencies - ) - elif isinstance(task, api.task.StubTask): - self._create_stub_task(models.Task.STUB, dependencies, task.id) - else: - raise RuntimeError('Undefined state') - - # Insert end marker - self._create_stub_task( - end_stub_type, - self._get_non_dependent_tasks(self._ctx.execution) or [start_task], - self._end_graph_suffix(task_graph.id), - task_graph.name - ) - - def _create_stub_task(self, stub_type, dependencies, api_id, name=None): - model_task = models.Task( - name=name, - dependencies=dependencies, - execution=self._ctx.execution, - _executor=self._stub_executor, - _stub_type=stub_type) - self._ctx.model.task.put(model_task) - self._model_to_api_id[model_task.id] = api_id - return model_task - - def _create_operation_task(self, api_task, dependencies): - model_task = models.Task.from_api_task( - api_task, self._default_executor, dependencies=dependencies) - self._ctx.model.task.put(model_task) - self._model_to_api_id[model_task.id] = api_task.id - return model_task - - @staticmethod - def _start_graph_suffix(api_id): - return '{0}-Start'.format(api_id) - - @staticmethod - def _end_graph_suffix(api_id): - return '{0}-End'.format(api_id) - - @staticmethod - def _get_non_dependent_tasks(execution): - tasks_with_dependencies = set() - for task in execution.tasks: - tasks_with_dependencies.update(task.dependencies) - return list(set(execution.tasks) - set(tasks_with_dependencies)) - - def _get_tasks_from_dependencies(self, dependencies): - """ - Returns task list from dependencies. - """ - tasks = [] - for dependency in dependencies: - if getattr(dependency, 'actor', False): - # This is - dependency_name = dependency.id - else: - dependency_name = self._end_graph_suffix(dependency.id) - tasks.extend(task for task in self._ctx.execution.tasks - if self._model_to_api_id.get(task.id, None) == dependency_name) - return tasks http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/660139ed/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py index f594e36..d52ae85 100644 --- a/aria/orchestrator/workflows/core/engine.py +++ b/aria/orchestrator/workflows/core/engine.py @@ -48,20 +48,20 @@ class Engine(logger.LoggerMixin): if resuming: events.on_resume_workflow_signal.send(ctx) - task_tracker = _TasksTracker(ctx) + tasks_tracker = _TasksTracker(ctx) try: events.start_workflow_signal.send(ctx) while True: cancel = self._is_cancel(ctx) if cancel: break - for task in task_tracker.ended_tasks: + for task in tasks_tracker.ended_tasks: self._handle_ended_tasks(task) - task_tracker.finished_(task) - for task in task_tracker.executable_tasks: - task_tracker.executing_(task) + tasks_tracker.finished(task) + for task in tasks_tracker.executable_tasks: + tasks_tracker.executing(task) self._handle_executable_task(ctx, task) - if task_tracker.all_tasks_consumed: + if tasks_tracker.all_tasks_consumed: break else: time.sleep(0.1) @@ -125,13 +125,13 @@ class _TasksTracker(object): def all_tasks_consumed(self): return len(self._executed_tasks) == len(self._tasks) and len(self._executing_tasks) == 0 - def executing_(self, task): + def executing(self, task): # Task executing could be retrying (thus removed and added earlier) if task not in self._executing_tasks: self._executable_tasks.remove(task) self._executing_tasks.append(task) - def finished_(self, task): + def finished(self, task): self._executing_tasks.remove(task) self._executed_tasks.append(task) @@ -144,7 +144,7 @@ class _TasksTracker(object): @property def executable_tasks(self): now = datetime.utcnow() - # we need both list since retrying task are in the executing task list. + # we need both lists since retrying task are in the executing task list. for task in self._update_tasks(self._executing_tasks + self._executable_tasks): if all([task.is_waiting(), task.due_at <= now, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/660139ed/aria/orchestrator/workflows/core/graph_compiler.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/graph_compiler.py b/aria/orchestrator/workflows/core/graph_compiler.py new file mode 100644 index 0000000..f339038 --- /dev/null +++ b/aria/orchestrator/workflows/core/graph_compiler.py @@ -0,0 +1,120 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from ....modeling import models +from .. import executor, api + + +class GraphCompiler(object): + def __init__(self, ctx, default_executor): + self._ctx = ctx + self._default_executor = default_executor + self._stub_executor = executor.base.StubTaskExecutor + self._model_to_api_id = {} + + def compile(self, + task_graph, + start_stub_type=models.Task.START_WORKFLOW, + end_stub_type=models.Task.END_WORKFLOW, + depends_on=()): + """ + Translates the user graph to the execution graph + :param task_graph: The user's graph + :param start_stub_type: internal use + :param end_stub_type: internal use + :param depends_on: internal use + """ + task_graph = task_graph or self._task_graph + depends_on = list(depends_on) + + # Insert start marker + start_task = self._create_stub_task( + start_stub_type, depends_on, self._start_graph_suffix(task_graph.id), task_graph.name, + ) + + for task in task_graph.topological_order(reverse=True): + dependencies = \ + (self._get_tasks_from_dependencies(task_graph.get_dependencies(task)) + or [start_task]) + + if isinstance(task, api.task.OperationTask): + self._create_operation_task(task, dependencies) + + elif isinstance(task, api.task.WorkflowTask): + # Build the graph recursively while adding start and end markers + self.compile( + task, models.Task.START_SUBWROFKLOW, models.Task.END_SUBWORKFLOW, dependencies + ) + elif isinstance(task, api.task.StubTask): + self._create_stub_task(models.Task.STUB, dependencies, task.id) + else: + raise RuntimeError('Undefined state') + + # Insert end marker + self._create_stub_task( + end_stub_type, + self._get_non_dependent_tasks(self._ctx.execution) or [start_task], + self._end_graph_suffix(task_graph.id), + task_graph.name + ) + + def _create_stub_task(self, stub_type, dependencies, api_id, name=None): + model_task = models.Task( + name=name, + dependencies=dependencies, + execution=self._ctx.execution, + _executor=self._stub_executor, + _stub_type=stub_type) + self._ctx.model.task.put(model_task) + self._model_to_api_id[model_task.id] = api_id + return model_task + + def _create_operation_task(self, api_task, dependencies): + model_task = models.Task.from_api_task( + api_task, self._default_executor, dependencies=dependencies) + self._ctx.model.task.put(model_task) + self._model_to_api_id[model_task.id] = api_task.id + return model_task + + @staticmethod + def _start_graph_suffix(api_id): + return '{0}-Start'.format(api_id) + + @staticmethod + def _end_graph_suffix(api_id): + return '{0}-End'.format(api_id) + + @staticmethod + def _get_non_dependent_tasks(execution): + tasks_with_dependencies = set() + for task in execution.tasks: + tasks_with_dependencies.update(task.dependencies) + return list(set(execution.tasks) - set(tasks_with_dependencies)) + + def _get_tasks_from_dependencies(self, dependencies): + """ + Returns task list from dependencies. + """ + tasks = [] + for dependency in dependencies: + if getattr(dependency, 'actor', False): + # This is + dependency_name = dependency.id + else: + dependency_name = self._end_graph_suffix(dependency.id) + tasks.extend(task for task in self._ctx.execution.tasks + if self._model_to_api_id.get(task.id, None) == dependency_name) + return tasks http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/660139ed/tests/orchestrator/context/__init__.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/__init__.py b/tests/orchestrator/context/__init__.py index 752706e..780db07 100644 --- a/tests/orchestrator/context/__init__.py +++ b/tests/orchestrator/context/__init__.py @@ -15,7 +15,7 @@ import sys -from aria.orchestrator.workflows.core import engine, compile +from aria.orchestrator.workflows.core import engine, graph_compiler def op_path(func, module_path=None): @@ -26,7 +26,7 @@ def op_path(func, module_path=None): def execute(workflow_func, workflow_context, executor): graph = workflow_func(ctx=workflow_context) - compile.GraphCompiler(workflow_context, executor.__class__).compile(graph) + graph_compiler.GraphCompiler(workflow_context, executor.__class__).compile(graph) eng = engine.Engine(executors={executor.__class__: executor}) eng.execute(workflow_context) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/660139ed/tests/orchestrator/context/test_serialize.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_serialize.py b/tests/orchestrator/context/test_serialize.py index b7335a0..6046a16 100644 --- a/tests/orchestrator/context/test_serialize.py +++ b/tests/orchestrator/context/test_serialize.py @@ -16,7 +16,7 @@ import pytest from aria.orchestrator.workflows import api -from aria.orchestrator.workflows.core import engine, compile +from aria.orchestrator.workflows.core import engine, graph_compiler from aria.orchestrator.workflows.executor import process from aria.orchestrator import workflow, operation import tests @@ -48,7 +48,7 @@ def test_serialize_operation_context(context, executor, tmpdir): context.model.node.update(node) graph = _mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter - compile.GraphCompiler(context, executor.__class__).compile(graph) + graph_compiler.GraphCompiler(context, executor.__class__).compile(graph) eng = engine.Engine({executor.__class__: executor}) eng.execute(context) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/660139ed/tests/orchestrator/execution_plugin/test_local.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py index ab6310c..5b94917 100644 --- a/tests/orchestrator/execution_plugin/test_local.py +++ b/tests/orchestrator/execution_plugin/test_local.py @@ -28,7 +28,7 @@ from aria.orchestrator.execution_plugin.exceptions import ProcessException from aria.orchestrator.execution_plugin import local from aria.orchestrator.execution_plugin import constants from aria.orchestrator.workflows.executor import process -from aria.orchestrator.workflows.core import engine, compile +from aria.orchestrator.workflows.core import engine, graph_compiler from tests import mock from tests import storage @@ -500,7 +500,7 @@ if __name__ == '__main__': arguments=arguments)) return graph tasks_graph = mock_workflow(ctx=workflow_context) # pylint: disable=no-value-for-parameter - compile.GraphCompiler(workflow_context, executor.__class__).compile(tasks_graph) + graph_compiler.GraphCompiler(workflow_context, executor.__class__).compile(tasks_graph) eng = engine.Engine({executor.__class__: executor}) eng.execute(workflow_context) return workflow_context.model.node.get_by_name( http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/660139ed/tests/orchestrator/execution_plugin/test_ssh.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_ssh.py b/tests/orchestrator/execution_plugin/test_ssh.py index 13ad1a3..4fa8184 100644 --- a/tests/orchestrator/execution_plugin/test_ssh.py +++ b/tests/orchestrator/execution_plugin/test_ssh.py @@ -29,7 +29,7 @@ from aria.orchestrator import events from aria.orchestrator import workflow from aria.orchestrator.workflows import api from aria.orchestrator.workflows.executor import process -from aria.orchestrator.workflows.core import engine, compile +from aria.orchestrator.workflows.core import engine, graph_compiler from aria.orchestrator.workflows.exceptions import ExecutorException from aria.orchestrator.exceptions import TaskAbortException, TaskRetryException from aria.orchestrator.execution_plugin import operations @@ -254,7 +254,8 @@ class TestWithActualSSHServer(object): graph.sequence(*ops) return graph tasks_graph = mock_workflow(ctx=self._workflow_context) # pylint: disable=no-value-for-parameter - compile.GraphCompiler(self._workflow_context, self._executor.__class__).compile(tasks_graph) + graph_compiler.GraphCompiler( + self._workflow_context, self._executor.__class__).compile(tasks_graph) eng = engine.Engine({self._executor.__class__: self._executor}) eng.execute(self._workflow_context) return self._workflow_context.model.node.get_by_name( http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/660139ed/tests/orchestrator/workflows/core/test_engine.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py index 7275723..21a53d7 100644 --- a/tests/orchestrator/workflows/core/test_engine.py +++ b/tests/orchestrator/workflows/core/test_engine.py @@ -28,7 +28,7 @@ from aria.orchestrator.workflows import ( api, exceptions, ) -from aria.orchestrator.workflows.core import engine, compile +from aria.orchestrator.workflows.core import engine, graph_compiler from aria.orchestrator.workflows.executor import thread from tests import mock, storage @@ -50,7 +50,7 @@ class BaseTest(object): @staticmethod def _engine(workflow_func, workflow_context, executor): graph = workflow_func(ctx=workflow_context) - compile.GraphCompiler(workflow_context, executor.__class__).compile(graph) + graph_compiler.GraphCompiler(workflow_context, executor.__class__).compile(graph) return engine.Engine(executors={executor.__class__: executor}) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/660139ed/tests/orchestrator/workflows/core/test_events.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_events.py b/tests/orchestrator/workflows/core/test_events.py index 32a6b7b..30cc8ee 100644 --- a/tests/orchestrator/workflows/core/test_events.py +++ b/tests/orchestrator/workflows/core/test_events.py @@ -16,7 +16,7 @@ import pytest from aria.orchestrator.decorators import operation, workflow -from aria.orchestrator.workflows.core import engine, compile +from aria.orchestrator.workflows.core import engine, graph_compiler from aria.orchestrator.workflows.executor.thread import ThreadExecutor from aria.orchestrator.workflows import api from aria.modeling.service_instance import NodeBase @@ -113,7 +113,7 @@ def run_operation_on_node(ctx, op_name, interface_name): operation_name=op_name, operation_kwargs=dict(function='{name}.{func.__name__}'.format(name=__name__, func=func))) node.interfaces[interface.name] = interface - compile.GraphCompiler(ctx, ThreadExecutor).compile( + graph_compiler.GraphCompiler(ctx, ThreadExecutor).compile( single_operation_workflow(ctx, node=node, interface_name=interface_name, op_name=op_name) ) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/660139ed/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py index 3d47d54..f0d2b26 100644 --- a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py +++ b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py @@ -18,7 +18,7 @@ from networkx import topological_sort, DiGraph from aria.modeling import models from aria.orchestrator import context from aria.orchestrator.workflows import api -from aria.orchestrator.workflows.core import compile +from aria.orchestrator.workflows.core import graph_compiler from aria.orchestrator.workflows.executor import base from tests import mock from tests import storage @@ -65,8 +65,8 @@ def test_task_graph_into_execution_graph(tmpdir): test_task_graph.add_dependency(inner_task_graph, simple_before_task) test_task_graph.add_dependency(simple_after_task, inner_task_graph) - graph_compiler = compile.GraphCompiler(workflow_context, base.StubTaskExecutor) - graph_compiler.compile(test_task_graph) + compiler = graph_compiler.GraphCompiler(workflow_context, base.StubTaskExecutor) + compiler.compile(test_task_graph) execution_tasks = topological_sort(_graph(workflow_context.execution.tasks)) @@ -82,7 +82,7 @@ def test_task_graph_into_execution_graph(tmpdir): '{0}-End'.format(test_task_graph.id) ] - assert expected_tasks_names == [graph_compiler._model_to_api_id[t.id] for t in execution_tasks] + assert expected_tasks_names == [compiler._model_to_api_id[t.id] for t in execution_tasks] assert all(isinstance(task, models.Task) for task in execution_tasks) execution_tasks = iter(execution_tasks) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/660139ed/tests/orchestrator/workflows/executor/test_process_executor_extension.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor_extension.py b/tests/orchestrator/workflows/executor/test_process_executor_extension.py index aa08685..6ed3e2b 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor_extension.py +++ b/tests/orchestrator/workflows/executor/test_process_executor_extension.py @@ -17,7 +17,7 @@ import pytest from aria import extension from aria.orchestrator.workflows import api -from aria.orchestrator.workflows.core import engine, compile +from aria.orchestrator.workflows.core import engine, graph_compiler from aria.orchestrator.workflows.executor import process from aria.orchestrator import workflow, operation @@ -57,7 +57,7 @@ def test_decorate_extension(context, executor): graph.add_tasks(task) return graph graph = mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter - compile.GraphCompiler(context, executor.__class__).compile(graph) + graph_compiler.GraphCompiler(context, executor.__class__).compile(graph) eng = engine.Engine({executor.__class__: executor}) eng.execute(context) out = get_node(context).attributes.get('out').value http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/660139ed/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py index 7102b13..a74a473 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py +++ b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py @@ -18,7 +18,7 @@ import copy import pytest from aria.orchestrator.workflows import api -from aria.orchestrator.workflows.core import engine, compile +from aria.orchestrator.workflows.core import engine, graph_compiler from aria.orchestrator.workflows.executor import process from aria.orchestrator import workflow, operation from aria.orchestrator.workflows import exceptions @@ -107,7 +107,7 @@ def _run_workflow(context, executor, op_func, arguments=None): graph.add_tasks(task) return graph graph = mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter - compile.GraphCompiler(context, executor.__class__).compile(graph) + graph_compiler.GraphCompiler(context, executor.__class__).compile(graph) eng = engine.Engine({executor.__class__: executor}) eng.execute(context) out = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME).attributes.get('out')
