Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-284-Cleanup-and-optimize-the-task-execution 827230da2 -> ae2bab3da
review 1 Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/ae2bab3d Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/ae2bab3d Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/ae2bab3d Branch: refs/heads/ARIA-284-Cleanup-and-optimize-the-task-execution Commit: ae2bab3dacd38bebf9b6bb3394ab6f9fc5471cfc Parents: 827230d Author: max-orlov <[email protected]> Authored: Sun Jun 25 14:01:56 2017 +0300 Committer: max-orlov <[email protected]> Committed: Sun Jun 25 14:01:56 2017 +0300 ---------------------------------------------------------------------- aria/orchestrator/workflows/core/compile.py | 122 ------------------- aria/orchestrator/workflows/core/engine.py | 18 +-- .../workflows/core/graph_compiler.py | 120 ++++++++++++++++++ 3 files changed, 129 insertions(+), 131 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ae2bab3d/aria/orchestrator/workflows/core/compile.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/compile.py b/aria/orchestrator/workflows/core/compile.py deleted file mode 100644 index 83de22c..0000000 --- a/aria/orchestrator/workflows/core/compile.py +++ /dev/null @@ -1,122 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from ....modeling import models -from .. import executor, api - - -# TODO: is class really needed? - -class GraphCompiler(object): - def __init__(self, ctx, default_executor): - self._ctx = ctx - self._default_executor = default_executor - self._stub_executor = executor.base.StubTaskExecutor - self._model_to_api_id = {} - - def compile(self, - task_graph, - start_stub_type=models.Task.START_WORKFLOW, - end_stub_type=models.Task.END_WORKFLOW, - depends_on=()): - """ - Translates the user graph to the execution graph - :param task_graph: The user's graph - :param start_stub_type: internal use - :param end_stub_type: internal use - :param depends_on: internal use - """ - task_graph = task_graph or self._task_graph - depends_on = list(depends_on) - - # Insert start marker - start_task = self._create_stub_task( - start_stub_type, depends_on, self._start_graph_suffix(task_graph.id), task_graph.name, - ) - - for task in task_graph.topological_order(reverse=True): - dependencies = \ - (self._get_tasks_from_dependencies(task_graph.get_dependencies(task)) - or [start_task]) - - if isinstance(task, api.task.OperationTask): - self._create_operation_task(task, dependencies) - - elif isinstance(task, api.task.WorkflowTask): - # Build the graph recursively while adding start and end markers - self.compile( - task, models.Task.START_SUBWROFKLOW, models.Task.END_SUBWORKFLOW, dependencies - ) - elif isinstance(task, api.task.StubTask): - self._create_stub_task(models.Task.STUB, dependencies, task.id) - else: - raise RuntimeError('Undefined state') - - # Insert end marker - self._create_stub_task( - end_stub_type, - self._get_non_dependent_tasks(self._ctx.execution) or [start_task], - self._end_graph_suffix(task_graph.id), - task_graph.name - ) - - def _create_stub_task(self, stub_type, dependencies, api_id, name=None): - model_task = models.Task( - name=name, - dependencies=dependencies, - execution=self._ctx.execution, - _executor=self._stub_executor, - _stub_type=stub_type) - self._ctx.model.task.put(model_task) - self._model_to_api_id[model_task.id] = api_id - return model_task - - def _create_operation_task(self, api_task, dependencies): - model_task = models.Task.from_api_task( - api_task, self._default_executor, dependencies=dependencies) - self._ctx.model.task.put(model_task) - self._model_to_api_id[model_task.id] = api_task.id - return model_task - - @staticmethod - def _start_graph_suffix(api_id): - return '{0}-Start'.format(api_id) - - @staticmethod - def _end_graph_suffix(api_id): - return '{0}-End'.format(api_id) - - @staticmethod - def _get_non_dependent_tasks(execution): - tasks_with_dependencies = set() - for task in execution.tasks: - tasks_with_dependencies.update(task.dependencies) - return list(set(execution.tasks) - set(tasks_with_dependencies)) - - def _get_tasks_from_dependencies(self, dependencies): - """ - Returns task list from dependencies. - """ - tasks = [] - for dependency in dependencies: - if getattr(dependency, 'actor', False): - # This is - dependency_name = dependency.id - else: - dependency_name = self._end_graph_suffix(dependency.id) - tasks.extend(task for task in self._ctx.execution.tasks - if self._model_to_api_id.get(task.id, None) == dependency_name) - return tasks http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ae2bab3d/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py index f594e36..d52ae85 100644 --- a/aria/orchestrator/workflows/core/engine.py +++ b/aria/orchestrator/workflows/core/engine.py @@ -48,20 +48,20 @@ class Engine(logger.LoggerMixin): if resuming: events.on_resume_workflow_signal.send(ctx) - task_tracker = _TasksTracker(ctx) + tasks_tracker = _TasksTracker(ctx) try: events.start_workflow_signal.send(ctx) while True: cancel = self._is_cancel(ctx) if cancel: break - for task in task_tracker.ended_tasks: + for task in tasks_tracker.ended_tasks: self._handle_ended_tasks(task) - task_tracker.finished_(task) - for task in task_tracker.executable_tasks: - task_tracker.executing_(task) + tasks_tracker.finished(task) + for task in tasks_tracker.executable_tasks: + tasks_tracker.executing(task) self._handle_executable_task(ctx, task) - if task_tracker.all_tasks_consumed: + if tasks_tracker.all_tasks_consumed: break else: time.sleep(0.1) @@ -125,13 +125,13 @@ class _TasksTracker(object): def all_tasks_consumed(self): return len(self._executed_tasks) == len(self._tasks) and len(self._executing_tasks) == 0 - def executing_(self, task): + def executing(self, task): # Task executing could be retrying (thus removed and added earlier) if task not in self._executing_tasks: self._executable_tasks.remove(task) self._executing_tasks.append(task) - def finished_(self, task): + def finished(self, task): self._executing_tasks.remove(task) self._executed_tasks.append(task) @@ -144,7 +144,7 @@ class _TasksTracker(object): @property def executable_tasks(self): now = datetime.utcnow() - # we need both list since retrying task are in the executing task list. + # we need both lists since retrying task are in the executing task list. for task in self._update_tasks(self._executing_tasks + self._executable_tasks): if all([task.is_waiting(), task.due_at <= now, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ae2bab3d/aria/orchestrator/workflows/core/graph_compiler.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/graph_compiler.py b/aria/orchestrator/workflows/core/graph_compiler.py new file mode 100644 index 0000000..f339038 --- /dev/null +++ b/aria/orchestrator/workflows/core/graph_compiler.py @@ -0,0 +1,120 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from ....modeling import models +from .. import executor, api + + +class GraphCompiler(object): + def __init__(self, ctx, default_executor): + self._ctx = ctx + self._default_executor = default_executor + self._stub_executor = executor.base.StubTaskExecutor + self._model_to_api_id = {} + + def compile(self, + task_graph, + start_stub_type=models.Task.START_WORKFLOW, + end_stub_type=models.Task.END_WORKFLOW, + depends_on=()): + """ + Translates the user graph to the execution graph + :param task_graph: The user's graph + :param start_stub_type: internal use + :param end_stub_type: internal use + :param depends_on: internal use + """ + task_graph = task_graph or self._task_graph + depends_on = list(depends_on) + + # Insert start marker + start_task = self._create_stub_task( + start_stub_type, depends_on, self._start_graph_suffix(task_graph.id), task_graph.name, + ) + + for task in task_graph.topological_order(reverse=True): + dependencies = \ + (self._get_tasks_from_dependencies(task_graph.get_dependencies(task)) + or [start_task]) + + if isinstance(task, api.task.OperationTask): + self._create_operation_task(task, dependencies) + + elif isinstance(task, api.task.WorkflowTask): + # Build the graph recursively while adding start and end markers + self.compile( + task, models.Task.START_SUBWROFKLOW, models.Task.END_SUBWORKFLOW, dependencies + ) + elif isinstance(task, api.task.StubTask): + self._create_stub_task(models.Task.STUB, dependencies, task.id) + else: + raise RuntimeError('Undefined state') + + # Insert end marker + self._create_stub_task( + end_stub_type, + self._get_non_dependent_tasks(self._ctx.execution) or [start_task], + self._end_graph_suffix(task_graph.id), + task_graph.name + ) + + def _create_stub_task(self, stub_type, dependencies, api_id, name=None): + model_task = models.Task( + name=name, + dependencies=dependencies, + execution=self._ctx.execution, + _executor=self._stub_executor, + _stub_type=stub_type) + self._ctx.model.task.put(model_task) + self._model_to_api_id[model_task.id] = api_id + return model_task + + def _create_operation_task(self, api_task, dependencies): + model_task = models.Task.from_api_task( + api_task, self._default_executor, dependencies=dependencies) + self._ctx.model.task.put(model_task) + self._model_to_api_id[model_task.id] = api_task.id + return model_task + + @staticmethod + def _start_graph_suffix(api_id): + return '{0}-Start'.format(api_id) + + @staticmethod + def _end_graph_suffix(api_id): + return '{0}-End'.format(api_id) + + @staticmethod + def _get_non_dependent_tasks(execution): + tasks_with_dependencies = set() + for task in execution.tasks: + tasks_with_dependencies.update(task.dependencies) + return list(set(execution.tasks) - set(tasks_with_dependencies)) + + def _get_tasks_from_dependencies(self, dependencies): + """ + Returns task list from dependencies. + """ + tasks = [] + for dependency in dependencies: + if getattr(dependency, 'actor', False): + # This is + dependency_name = dependency.id + else: + dependency_name = self._end_graph_suffix(dependency.id) + tasks.extend(task for task in self._ctx.execution.tasks + if self._model_to_api_id.get(task.id, None) == dependency_name) + return tasks
