Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-237-Support-for-resuming-failed-workflow-executions 34b64dcce -> ffc0d85ac (forced update)
added test Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/ffc0d85a Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/ffc0d85a Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/ffc0d85a Branch: refs/heads/ARIA-237-Support-for-resuming-failed-workflow-executions Commit: ffc0d85acd92f732c8317de5d317fc8f31090964 Parents: d4db727 Author: max-orlov <[email protected]> Authored: Sun Jul 9 16:19:30 2017 +0300 Committer: max-orlov <[email protected]> Committed: Sun Jul 9 17:30:21 2017 +0300 ---------------------------------------------------------------------- aria/orchestrator/workflows/core/engine.py | 9 +- tests/orchestrator/test_workflow_runner.py | 125 +++++++++++++++++++++--- 2 files changed, 119 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ffc0d85a/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py index 0ec3cd8..69505fc 100644 --- a/aria/orchestrator/workflows/core/engine.py +++ b/aria/orchestrator/workflows/core/engine.py @@ -69,8 +69,15 @@ class Engine(logger.LoggerMixin): if cancel: self._terminate_tasks(tasks_tracker.executing_tasks) events.on_cancelled_workflow_signal.send(ctx) - else: + elif all(task.status == task.SUCCESS or task.ignore_failure + for task in ctx.execution.tasks): events.on_success_workflow_signal.send(ctx) + else: + exception = "Tasks {tasks} remain failed".format( + tasks= + [t for t in ctx.execution.tasks if t.status == t.SUCCESS or t.ignore_failure] + ) + events.on_failure_workflow_signal.send(ctx, exception=exception) except BaseException as e: # Cleanup any remaining tasks self._terminate_tasks(tasks_tracker.executing_tasks) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ffc0d85a/tests/orchestrator/test_workflow_runner.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py index 7ed0182..1bfc0ce 100644 --- a/tests/orchestrator/test_workflow_runner.py +++ b/tests/orchestrator/test_workflow_runner.py @@ -50,7 +50,8 @@ custom_events = { 'is_resumed': Event(), 'is_active': Event(), 'execution_cancelled': Event(), - 'execution_ended': Event() + 'execution_ended': Event(), + 'execution_failed': Event() } @@ -349,7 +350,7 @@ class TestResumableWorkflows(object): self._create_interface(workflow_context, node, mock_resuming_task) wf_runner = self._create_initial_workflow_runner( - workflow_context, mock_parallel_workflow, thread_executor) + workflow_context, mock_two_parallel_tasks_workflow, thread_executor) wf_thread = Thread(target=wf_runner.execute) wf_thread.daemon = True @@ -387,14 +388,14 @@ class TestResumableWorkflows(object): assert node.attributes['invocations'].value == 3 assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED - def test_resume_failed_task(self, workflow_context, thread_executor): + def test_resume_single_failed_task(self, workflow_context, thread_executor): node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) node.attributes['invocations'] = models.Attribute.wrap('invocations', 0) - self._create_interface(workflow_context, node, mock_failed_first_task) + self._create_interface(workflow_context, node, mock_conditional_failing_task) wf_runner = self._create_initial_workflow_runner( - workflow_context, mock_sequential_workflow, thread_executor) + workflow_context, mock_single_task_workflow, thread_executor) wf_thread = Thread(target=wf_runner.execute) wf_thread.setDaemon(True) wf_thread.start() @@ -436,6 +437,76 @@ class TestResumableWorkflows(object): assert task.status == task.SUCCESS assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED + def test_resume_failed_task_and_successful_task(self, workflow_context, thread_executor): + node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) + node.attributes['invocations'] = models.Attribute.wrap('invocations', 0) + self._create_interface(workflow_context, node, mock_three_different_tasks) + + wf_runner = self._create_initial_workflow_runner( + workflow_context, mock_three_parallel_tasks_workflow, thread_executor) + wf_thread = Thread(target=wf_runner.execute) + wf_thread.setDaemon(True) + wf_thread.start() + + if custom_events['execution_ended'].wait(60) is False: + raise TimeoutError("Execution did not end") + + tasks = workflow_context.model.task.list(filters={'_stub_type': None}) + assert node.attributes['invocations'].value == 3 + + # First task passes + assert any(task.status == task.FAILED for task in tasks) + # Second task fails + assert any(task.status == task.SUCCESS for task in tasks) + # third task remains stuck + + assert wf_runner.execution.status in wf_runner.execution.FAILED + + custom_events['is_resumed'].set() + # third task ends + assert node.attributes['invocations'].value == 3 + + # Create a new workflow runner, with an existing execution id. This would cause + # the old execution to restart. + new_thread_executor = thread.ThreadExecutor() + try: + new_wf_runner = WorkflowRunner( + service_id=wf_runner.service.id, + inputs={}, + model_storage=workflow_context.model, + resource_storage=workflow_context.resource, + plugin_manager=None, + execution_id=wf_runner.execution.id, + executor=new_thread_executor) + + new_wf_runner.execute() + finally: + new_thread_executor.close() + assert node.attributes['invocations'].value == 3 + + # Resuming the workflow with retry failed should retry only the second task ( + # as it was the only one failed) + new_thread_executor = thread.ThreadExecutor() + try: + new_wf_runner = WorkflowRunner( + service_id=wf_runner.service.id, + retry_failed=True, + inputs={}, + model_storage=workflow_context.model, + resource_storage=workflow_context.resource, + plugin_manager=None, + execution_id=wf_runner.execution.id, + executor=new_thread_executor) + + new_wf_runner.execute() + finally: + new_thread_executor.close() + + # Wait for it to finish and assert changes. + assert node.attributes['invocations'].value == 4 + assert all(task.status == task.SUCCESS for task in tasks) + assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED + @staticmethod @pytest.fixture def thread_executor(): @@ -495,7 +566,7 @@ class TestResumableWorkflows(object): @workflow -def mock_parallel_workflow(ctx, graph): +def mock_two_parallel_tasks_workflow(ctx, graph): node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) graph.add_tasks( api.task.OperationTask( @@ -518,19 +589,16 @@ def mock_resuming_task(ctx): @workflow -def mock_sequential_workflow(ctx, graph): +def mock_single_task_workflow(ctx, graph): node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) - graph.sequence( - api.task.OperationTask(node, - interface_name='aria.interfaces.lifecycle', - operation_name='create', - retry_interval=1, - max_attempts=10), + graph.add_tasks( + api.task.OperationTask( + node, 'aria.interfaces.lifecycle', 'create', retry_interval=1, max_attempts=10) ) @operation -def mock_failed_first_task(ctx): +def mock_conditional_failing_task(ctx): """ The task should run atmost ctx.task.max_attempts - 1 times, and only then pass. overall, the number of invocations should be ctx.task.max_attempts - 1 @@ -549,3 +617,32 @@ def mock_failed_first_task(ctx): else: # fail o.w. raise BaseException("stop this task") + + +@workflow +def mock_three_parallel_tasks_workflow(ctx, graph): + node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) + graph.add_tasks( + api.task.OperationTask( + node, 'aria.interfaces.lifecycle', 'create', retry_interval=1, max_attempts=1), + api.task.OperationTask( + node, 'aria.interfaces.lifecycle', 'create', retry_interval=1, max_attempts=1), + api.task.OperationTask( + node, 'aria.interfaces.lifecycle', 'create', retry_interval=1, max_attempts=1), + ) + + +@operation +def mock_three_different_tasks(ctx): + ctx.node.attributes['invocations'] += 1 + + # The first task should end gracefully + + if ctx.node.attributes['invocations'] == 2: + # The second task should fail + raise BaseException("First execution should fail") + + elif ctx.node.attributes['invocations'] == 3: + # The third task should remain stuck until the execution is resumed + while custom_events['is_resumed'].isSet(): + pass
