wip
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/e7e1d895 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/e7e1d895 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/e7e1d895 Branch: refs/heads/ARIA-237-Support-for-resuming-failed-workflow-executions Commit: e7e1d8951708bea38705a9265df07856b3f2a22c Parents: 62875b5 Author: max-orlov <[email protected]> Authored: Sun Jul 2 21:43:43 2017 +0300 Committer: max-orlov <[email protected]> Committed: Sun Jul 9 14:05:24 2017 +0300 ---------------------------------------------------------------------- aria/modeling/orchestration.py | 4 ++- aria/orchestrator/workflow_runner.py | 7 +++-- aria/orchestrator/workflows/core/engine.py | 39 ++++++++++++++++++++----- 3 files changed, 40 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/e7e1d895/aria/modeling/orchestration.py ---------------------------------------------------------------------- diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py index 7068557..4d7a5b5 100644 --- a/aria/modeling/orchestration.py +++ b/aria/modeling/orchestration.py @@ -65,7 +65,9 @@ class ExecutionBase(mixins.ModelMixin): PENDING: (STARTED, CANCELLED), STARTED: END_STATES + (CANCELLING,), CANCELLING: END_STATES, - CANCELLED: PENDING + # Retrying + CANCELLED: PENDING, + FAILED: PENDING } # region one_to_many relationships http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/e7e1d895/aria/orchestrator/workflow_runner.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py index df1725f..b7de7b5 100644 --- a/aria/orchestrator/workflow_runner.py +++ b/aria/orchestrator/workflow_runner.py @@ -38,7 +38,8 @@ DEFAULT_TASK_RETRY_INTERVAL = 30 class WorkflowRunner(object): def __init__(self, model_storage, resource_storage, plugin_manager, - execution_id=None, service_id=None, workflow_name=None, inputs=None, executor=None, + execution_id=None, retry_failed=False, + service_id=None, workflow_name=None, inputs=None, executor=None, task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS, task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL): """ @@ -62,6 +63,7 @@ class WorkflowRunner(object): "and service id with inputs") self._is_resume = execution_id is not None + self._retry_failed = retry_failed self._model_storage = model_storage self._resource_storage = resource_storage @@ -116,7 +118,8 @@ class WorkflowRunner(object): return self._model_storage.service.get(self._service_id) def execute(self): - self._engine.execute(ctx=self._workflow_context, resuming=self._is_resume) + self._engine.execute( + ctx=self._workflow_context, resuming=self._is_resume, retry_failing=self._retry_failed) def cancel(self): self._engine.cancel_execution(ctx=self._workflow_context) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/e7e1d895/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py index d9c77e9..d22b535 100644 --- a/aria/orchestrator/workflows/core/engine.py +++ b/aria/orchestrator/workflows/core/engine.py @@ -41,14 +41,15 @@ class Engine(logger.LoggerMixin): self._executors = executors.copy() self._executors.setdefault(StubTaskExecutor, StubTaskExecutor()) - def execute(self, ctx, resuming=False): + def execute(self, ctx, resuming=False, retry_failing=False): """ Executes the workflow. """ if resuming: events.on_resume_workflow_signal.send(ctx) - tasks_tracker = _TasksTracker(ctx) + tasks_tracker = _TasksTracker(ctx, retry_failing) + try: events.start_workflow_signal.send(ctx) while True: @@ -124,13 +125,37 @@ class Engine(logger.LoggerMixin): class _TasksTracker(object): - def __init__(self, ctx): + + def __init__(self, ctx, retry_failing=False): self._ctx = ctx + + if retry_failing: + self._has_task_ended = self._retry_failed_has_task_ended + self._is_task_waiting = self._retry_failed_is_task_waiting + self._tasks = ctx.execution.tasks - self._executed_tasks = [task for task in self._tasks if task.has_ended()] + self._executed_tasks = [task for task in self._tasks if self._has_task_ended(task)] self._executable_tasks = list(set(self._tasks) - set(self._executed_tasks)) self._executing_tasks = [] + @staticmethod + def _retry_failed_has_task_ended(task): + # only succeeded tasks have ended (for retry failing) + return task.status == task.SUCCESS or (task.status == task.FAILED and task.ignore_failure) + + @staticmethod + def _retry_failed_is_task_waiting(task): + # failed tasks are waiting to be executed (for retry failing) + return task.is_waiting() + + @staticmethod + def _has_task_ended(task): + return task.has_ended() + + @staticmethod + def _is_task_waiting(task): + return task.is_waiting() + @property def all_tasks_consumed(self): return len(self._executed_tasks) == len(self._tasks) and len(self._executing_tasks) == 0 @@ -148,15 +173,15 @@ class _TasksTracker(object): @property def ended_tasks(self): for task in self.executing_tasks: - if task.has_ended(): + if self._has_task_ended(task): yield task @property def executable_tasks(self): now = datetime.utcnow() # we need both lists since retrying task are in the executing task list. - for task in self._update_tasks(self._executing_tasks + self._executable_tasks): - if all([task.is_waiting(), + for task in self._update_tasks(set(self._executing_tasks + self._executable_tasks)): + if all([self._is_task_waiting(task), task.due_at <= now, all(dependency in self._executed_tasks for dependency in task.dependencies) ]):
