removed additional logic, and just reset the tasks in the event
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/684b38fe Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/684b38fe Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/684b38fe Branch: refs/heads/ARIA-237-Support-for-resuming-failed-workflow-executions Commit: 684b38fe79adf3e5170a6c434598906f28d193dd Parents: e7e1d89 Author: max-orlov <[email protected]> Authored: Wed Jul 5 17:22:00 2017 +0300 Committer: max-orlov <[email protected]> Committed: Sun Jul 9 14:05:24 2017 +0300 ---------------------------------------------------------------------- aria/orchestrator/workflow_runner.py | 2 +- aria/orchestrator/workflows/core/engine.py | 36 ++++---------------- .../workflows/core/events_handler.py | 9 ++++- 3 files changed, 16 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/684b38fe/aria/orchestrator/workflow_runner.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py index b7de7b5..5592326 100644 --- a/aria/orchestrator/workflow_runner.py +++ b/aria/orchestrator/workflow_runner.py @@ -119,7 +119,7 @@ class WorkflowRunner(object): def execute(self): self._engine.execute( - ctx=self._workflow_context, resuming=self._is_resume, retry_failing=self._retry_failed) + ctx=self._workflow_context, resuming=self._is_resume, retry_failed=self._retry_failed) def cancel(self): self._engine.cancel_execution(ctx=self._workflow_context) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/684b38fe/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py index d22b535..0ec3cd8 100644 --- a/aria/orchestrator/workflows/core/engine.py +++ b/aria/orchestrator/workflows/core/engine.py @@ -41,14 +41,14 @@ class Engine(logger.LoggerMixin): self._executors = executors.copy() self._executors.setdefault(StubTaskExecutor, StubTaskExecutor()) - def execute(self, ctx, resuming=False, retry_failing=False): + def execute(self, ctx, resuming=False, retry_failed=False): """ Executes the workflow. """ if resuming: - events.on_resume_workflow_signal.send(ctx) + events.on_resume_workflow_signal.send(ctx, retry_failed=retry_failed) - tasks_tracker = _TasksTracker(ctx, retry_failing) + tasks_tracker = _TasksTracker(ctx) try: events.start_workflow_signal.send(ctx) @@ -126,36 +126,14 @@ class Engine(logger.LoggerMixin): class _TasksTracker(object): - def __init__(self, ctx, retry_failing=False): + def __init__(self, ctx): self._ctx = ctx - if retry_failing: - self._has_task_ended = self._retry_failed_has_task_ended - self._is_task_waiting = self._retry_failed_is_task_waiting - self._tasks = ctx.execution.tasks - self._executed_tasks = [task for task in self._tasks if self._has_task_ended(task)] + self._executed_tasks = [task for task in self._tasks if task.has_ended()] self._executable_tasks = list(set(self._tasks) - set(self._executed_tasks)) self._executing_tasks = [] - @staticmethod - def _retry_failed_has_task_ended(task): - # only succeeded tasks have ended (for retry failing) - return task.status == task.SUCCESS or (task.status == task.FAILED and task.ignore_failure) - - @staticmethod - def _retry_failed_is_task_waiting(task): - # failed tasks are waiting to be executed (for retry failing) - return task.is_waiting() - - @staticmethod - def _has_task_ended(task): - return task.has_ended() - - @staticmethod - def _is_task_waiting(task): - return task.is_waiting() - @property def all_tasks_consumed(self): return len(self._executed_tasks) == len(self._tasks) and len(self._executing_tasks) == 0 @@ -173,7 +151,7 @@ class _TasksTracker(object): @property def ended_tasks(self): for task in self.executing_tasks: - if self._has_task_ended(task): + if task.has_ended(): yield task @property @@ -181,7 +159,7 @@ class _TasksTracker(object): now = datetime.utcnow() # we need both lists since retrying task are in the executing task list. for task in self._update_tasks(set(self._executing_tasks + self._executable_tasks)): - if all([self._is_task_waiting(task), + if all([task.is_waiting(), task.due_at <= now, all(dependency in self._executed_tasks for dependency in task.dependencies) ]): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/684b38fe/aria/orchestrator/workflows/core/events_handler.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py index eb6f271..8a6c03d 100644 --- a/aria/orchestrator/workflows/core/events_handler.py +++ b/aria/orchestrator/workflows/core/events_handler.py @@ -123,11 +123,18 @@ def _workflow_cancelled(workflow_context, *args, **kwargs): @events.on_resume_workflow_signal.connect -def _workflow_resume(workflow_context, *args, **kwargs): +def _workflow_resume(workflow_context, retry_failed=False, *args, **kwargs): with workflow_context.persist_changes: execution = workflow_context.execution execution.status = execution.PENDING + if retry_failed: + for task in execution.tasks: + if task.status == task.FAILED and not task.ignore_failure: + task.attempts_count = 0 + task.status = task.PENDING + + @events.on_cancelling_workflow_signal.connect def _workflow_cancelling(workflow_context, *args, **kwargs):
