Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-284-Cleanup-and-optimize-the-task-execution 92773e206 -> a86ba295e (forced update)
fix retying mechanism Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/a86ba295 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/a86ba295 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/a86ba295 Branch: refs/heads/ARIA-284-Cleanup-and-optimize-the-task-execution Commit: a86ba295e94569f5cd29601f1da906b2af053e4a Parents: a66b74b Author: max-orlov <ma...@gigaspaces.com> Authored: Wed Jun 21 15:25:44 2017 +0300 Committer: max-orlov <ma...@gigaspaces.com> Committed: Wed Jun 21 15:51:25 2017 +0300 ---------------------------------------------------------------------- aria/orchestrator/context/workflow.py | 2 -- aria/orchestrator/workflows/core/engine.py | 15 +++++++++------ 2 files changed, 9 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a86ba295/aria/orchestrator/context/workflow.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/workflow.py b/aria/orchestrator/context/workflow.py index 2da3d4c..18334f3 100644 --- a/aria/orchestrator/context/workflow.py +++ b/aria/orchestrator/context/workflow.py @@ -20,8 +20,6 @@ Workflow and operation contexts import threading from contextlib import contextmanager -from networkx import DiGraph - from .exceptions import ContextException from .common import BaseContext http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a86ba295/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py index 8999232..603914e 100644 --- a/aria/orchestrator/workflows/core/engine.py +++ b/aria/orchestrator/workflows/core/engine.py @@ -53,8 +53,8 @@ class Engine(logger.LoggerMixin): if cancel: break for task in task_tracker.ended_tasks: - task_tracker.finished_(task) self._handle_ended_tasks(task) + task_tracker.finished_(task) for task in task_tracker.executable_tasks: task_tracker.executing_(task) self._handle_executable_task(ctx, task) @@ -110,7 +110,7 @@ class Engine(logger.LoggerMixin): raise exceptions.ExecutorException('Workflow failed') -class _TasksTracker(): +class _TasksTracker(object): def __init__(self, ctx): self._ctx = ctx self._tasks = ctx.execution.tasks @@ -123,8 +123,10 @@ class _TasksTracker(): return len(self._executed_tasks) == len(self._tasks) and len(self._executing_tasks) == 0 def executing_(self, task): - self._executable_tasks.remove(task) - self._executing_tasks.append(task) + # Task executing could be retrying (thus removed and added earlier) + if task not in self._executing_tasks: + self._executable_tasks.remove(task) + self._executing_tasks.append(task) def finished_(self, task): self._executing_tasks.remove(task) @@ -139,7 +141,8 @@ class _TasksTracker(): @property def executable_tasks(self): now = datetime.utcnow() - for task in self._update_tasks(self._executable_tasks): + # we need both list since retrying task are in the executing task list. + for task in self._update_tasks(self._executing_tasks + self._executable_tasks): if all([task.is_waiting(), task.due_at <= now, all(dependency in self._executed_tasks for dependency in task.dependencies) @@ -163,4 +166,4 @@ class _TasksTracker(): def _update_tasks(self, tasks): for task in tasks: - yield self._ctx.model.task.refresh(task) \ No newline at end of file + yield self._ctx.model.task.refresh(task)