Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-237-Support-for-resuming-failed-workflow-executions [created] bf753679a
wip Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/bf753679 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/bf753679 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/bf753679 Branch: refs/heads/ARIA-237-Support-for-resuming-failed-workflow-executions Commit: bf753679a8fcbd7e2dbe6afc0f013fa80ad60720 Parents: b30a7ed Author: max-orlov <[email protected]> Authored: Sun Jul 2 21:43:43 2017 +0300 Committer: max-orlov <[email protected]> Committed: Mon Jul 10 15:49:53 2017 +0300 ---------------------------------------------------------------------- aria/modeling/orchestration.py | 4 +- aria/orchestrator/workflow_runner.py | 7 +- aria/orchestrator/workflows/core/engine.py | 18 ++- .../workflows/core/events_handler.py | 9 +- tests/modeling/test_models.py | 3 +- tests/orchestrator/test_workflow_runner.py | 116 +++++++++++++++++-- 6 files changed, 138 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/bf753679/aria/modeling/orchestration.py ---------------------------------------------------------------------- diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py index df2643e..4d4f0fe 100644 --- a/aria/modeling/orchestration.py +++ b/aria/modeling/orchestration.py @@ -65,7 +65,9 @@ class ExecutionBase(mixins.ModelMixin): PENDING: (STARTED, CANCELLED), STARTED: END_STATES + (CANCELLING,), CANCELLING: END_STATES, - CANCELLED: PENDING + # Retrying + CANCELLED: PENDING, + FAILED: PENDING } # region one_to_many relationships http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/bf753679/aria/orchestrator/workflow_runner.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py index 47270c0..2bd3043 100644 --- a/aria/orchestrator/workflow_runner.py +++ b/aria/orchestrator/workflow_runner.py @@ -38,7 +38,8 @@ DEFAULT_TASK_RETRY_INTERVAL = 30 class WorkflowRunner(object): def __init__(self, model_storage, resource_storage, plugin_manager, - execution_id=None, service_id=None, workflow_name=None, inputs=None, executor=None, + execution_id=None, retry_failed=False, + service_id=None, workflow_name=None, inputs=None, executor=None, task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS, task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL): """ @@ -62,6 +63,7 @@ class WorkflowRunner(object): "and service id with inputs") self._is_resume = execution_id is not None + self._retry_failed = retry_failed self._model_storage = model_storage self._resource_storage = resource_storage @@ -116,7 +118,8 @@ class WorkflowRunner(object): return self._model_storage.service.get(self._service_id) def execute(self): - self._engine.execute(ctx=self._workflow_context, resuming=self._is_resume) + self._engine.execute( + ctx=self._workflow_context, resuming=self._is_resume, retry_failed=self._retry_failed) def cancel(self): self._engine.cancel_execution(ctx=self._workflow_context) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/bf753679/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py index d9c77e9..69505fc 100644 --- a/aria/orchestrator/workflows/core/engine.py +++ b/aria/orchestrator/workflows/core/engine.py @@ -41,14 +41,15 @@ class Engine(logger.LoggerMixin): self._executors = executors.copy() self._executors.setdefault(StubTaskExecutor, StubTaskExecutor()) - def execute(self, ctx, resuming=False): + def execute(self, ctx, resuming=False, retry_failed=False): """ Executes the workflow. """ if resuming: - events.on_resume_workflow_signal.send(ctx) + events.on_resume_workflow_signal.send(ctx, retry_failed=retry_failed) tasks_tracker = _TasksTracker(ctx) + try: events.start_workflow_signal.send(ctx) while True: @@ -68,8 +69,15 @@ class Engine(logger.LoggerMixin): if cancel: self._terminate_tasks(tasks_tracker.executing_tasks) events.on_cancelled_workflow_signal.send(ctx) - else: + elif all(task.status == task.SUCCESS or task.ignore_failure + for task in ctx.execution.tasks): events.on_success_workflow_signal.send(ctx) + else: + exception = "Tasks {tasks} remain failed".format( + tasks= + [t for t in ctx.execution.tasks if t.status == t.SUCCESS or t.ignore_failure] + ) + events.on_failure_workflow_signal.send(ctx, exception=exception) except BaseException as e: # Cleanup any remaining tasks self._terminate_tasks(tasks_tracker.executing_tasks) @@ -124,8 +132,10 @@ class Engine(logger.LoggerMixin): class _TasksTracker(object): + def __init__(self, ctx): self._ctx = ctx + self._tasks = ctx.execution.tasks self._executed_tasks = [task for task in self._tasks if task.has_ended()] self._executable_tasks = list(set(self._tasks) - set(self._executed_tasks)) @@ -155,7 +165,7 @@ class _TasksTracker(object): def executable_tasks(self): now = datetime.utcnow() # we need both lists since retrying task are in the executing task list. - for task in self._update_tasks(self._executing_tasks + self._executable_tasks): + for task in self._update_tasks(set(self._executing_tasks + self._executable_tasks)): if all([task.is_waiting(), task.due_at <= now, all(dependency in self._executed_tasks for dependency in task.dependencies) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/bf753679/aria/orchestrator/workflows/core/events_handler.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py index 37801de..5ac1ce8 100644 --- a/aria/orchestrator/workflows/core/events_handler.py +++ b/aria/orchestrator/workflows/core/events_handler.py @@ -119,7 +119,7 @@ def _workflow_cancelled(workflow_context, *args, **kwargs): @events.on_resume_workflow_signal.connect -def _workflow_resume(workflow_context, *args, **kwargs): +def _workflow_resume(workflow_context, retry_failed=False, *args, **kwargs): with workflow_context.persist_changes: execution = workflow_context.execution execution.status = execution.PENDING @@ -128,6 +128,13 @@ def _workflow_resume(workflow_context, *args, **kwargs): if not task.has_ended(): task.status = task.PENDING + if retry_failed: + for task in execution.tasks: + if task.status == task.FAILED and not task.ignore_failure: + task.attempts_count = 0 + task.status = task.PENDING + + @events.on_cancelling_workflow_signal.connect def _workflow_cancelling(workflow_context, *args, **kwargs): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/bf753679/tests/modeling/test_models.py ---------------------------------------------------------------------- diff --git a/tests/modeling/test_models.py b/tests/modeling/test_models.py index e1167fc..25b4080 100644 --- a/tests/modeling/test_models.py +++ b/tests/modeling/test_models.py @@ -324,8 +324,7 @@ class TestExecution(object): Execution.STARTED: [Execution.PENDING], Execution.CANCELLING: [Execution.PENDING, Execution.STARTED], - Execution.FAILED: [Execution.PENDING, - Execution.STARTED, + Execution.FAILED: [Execution.STARTED, Execution.SUCCEEDED, Execution.CANCELLED, Execution.CANCELLING], http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/bf753679/tests/orchestrator/test_workflow_runner.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py index a77d727..833b494 100644 --- a/tests/orchestrator/test_workflow_runner.py +++ b/tests/orchestrator/test_workflow_runner.py @@ -51,7 +51,8 @@ custom_events = { 'is_resumed': Event(), 'is_active': Event(), 'execution_cancelled': Event(), - 'execution_ended': Event() + 'execution_ended': Event(), + 'execution_failed': Event() } @@ -166,7 +167,8 @@ def test_execute(request, service): assert engine_kwargs['ctx'].execution.workflow_name == 'test_workflow' mock_engine_execute.assert_called_once_with(ctx=workflow_runner._workflow_context, - resuming=False) + resuming=False, + retry_failed=False) def test_cancel_execution(request): @@ -361,7 +363,7 @@ class TestResumableWorkflows(object): self._create_interface(workflow_context, node, mock_resuming_task) wf_runner = self._create_initial_workflow_runner( - workflow_context, mock_parallel_workflow, thread_executor) + workflow_context, mock_two_parallel_tasks_workflow, thread_executor) wf_thread = Thread(target=wf_runner.execute) wf_thread.daemon = True @@ -478,6 +480,76 @@ class TestResumableWorkflows(object): assert task.status == task.SUCCESS assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED + def test_resume_failed_task_and_successful_task(self, workflow_context, thread_executor): + node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) + node.attributes['invocations'] = models.Attribute.wrap('invocations', 0) + self._create_interface(workflow_context, node, mock_three_different_tasks) + + wf_runner = self._create_initial_workflow_runner( + workflow_context, mock_three_parallel_tasks_workflow, thread_executor) + wf_thread = Thread(target=wf_runner.execute) + wf_thread.setDaemon(True) + wf_thread.start() + + if custom_events['execution_ended'].wait(60) is False: + raise TimeoutError("Execution did not end") + + tasks = workflow_context.model.task.list(filters={'_stub_type': None}) + assert node.attributes['invocations'].value == 3 + + # First task passes + assert any(task.status == task.FAILED for task in tasks) + # Second task fails + assert any(task.status == task.SUCCESS for task in tasks) + # third task remains stuck + + assert wf_runner.execution.status in wf_runner.execution.FAILED + + custom_events['is_resumed'].set() + # third task ends + assert node.attributes['invocations'].value == 3 + + # Create a new workflow runner, with an existing execution id. This would cause + # the old execution to restart. + new_thread_executor = thread.ThreadExecutor() + try: + new_wf_runner = WorkflowRunner( + service_id=wf_runner.service.id, + inputs={}, + model_storage=workflow_context.model, + resource_storage=workflow_context.resource, + plugin_manager=None, + execution_id=wf_runner.execution.id, + executor=new_thread_executor) + + new_wf_runner.execute() + finally: + new_thread_executor.close() + assert node.attributes['invocations'].value == 3 + + # Resuming the workflow with retry failed should retry only the second task ( + # as it was the only one failed) + new_thread_executor = thread.ThreadExecutor() + try: + new_wf_runner = WorkflowRunner( + service_id=wf_runner.service.id, + retry_failed=True, + inputs={}, + model_storage=workflow_context.model, + resource_storage=workflow_context.resource, + plugin_manager=None, + execution_id=wf_runner.execution.id, + executor=new_thread_executor) + + new_wf_runner.execute() + finally: + new_thread_executor.close() + + # Wait for it to finish and assert changes. + assert node.attributes['invocations'].value == 4 + assert all(task.status == task.SUCCESS for task in tasks) + assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED + @staticmethod @pytest.fixture def thread_executor(): @@ -537,7 +609,7 @@ class TestResumableWorkflows(object): @workflow -def mock_parallel_workflow(ctx, graph): +def mock_two_parallel_tasks_workflow(ctx, graph): node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) graph.add_tasks( api.task.OperationTask( @@ -563,11 +635,8 @@ def mock_resuming_task(ctx): def mock_single_task_workflow(ctx, graph): node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) graph.add_tasks( - api.task.OperationTask(node, - interface_name='aria.interfaces.lifecycle', - operation_name='create', - retry_interval=1, - max_attempts=10), + api.task.OperationTask( + node, 'aria.interfaces.lifecycle', 'create', retry_interval=1, max_attempts=10) ) @@ -600,3 +669,32 @@ def mock_stuck_task(ctx): if not custom_events['is_active'].isSet(): custom_events['is_active'].set() time.sleep(5) + + +@workflow +def mock_three_parallel_tasks_workflow(ctx, graph): + node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) + graph.add_tasks( + api.task.OperationTask( + node, 'aria.interfaces.lifecycle', 'create', retry_interval=1, max_attempts=1), + api.task.OperationTask( + node, 'aria.interfaces.lifecycle', 'create', retry_interval=1, max_attempts=1), + api.task.OperationTask( + node, 'aria.interfaces.lifecycle', 'create', retry_interval=1, max_attempts=1), + ) + + +@operation +def mock_three_different_tasks(ctx): + ctx.node.attributes['invocations'] += 1 + + # The first task should end gracefully + + if ctx.node.attributes['invocations'] == 2: + # The second task should fail + raise BaseException("First execution should fail") + + elif ctx.node.attributes['invocations'] == 3: + # The third task should remain stuck until the execution is resumed + while not custom_events['is_resumed'].isSet(): + pass
