Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-237-Support-for-resuming-failed-workflow-executions 123a55ce7 -> 79dc7e75e
review 1 fixes Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/79dc7e75 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/79dc7e75 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/79dc7e75 Branch: refs/heads/ARIA-237-Support-for-resuming-failed-workflow-executions Commit: 79dc7e75ec46bd036485431957548b1c3b06dfed Parents: 123a55c Author: max-orlov <[email protected]> Authored: Tue Jul 11 16:49:15 2017 +0300 Committer: max-orlov <[email protected]> Committed: Tue Jul 11 16:49:15 2017 +0300 ---------------------------------------------------------------------- aria/orchestrator/workflow_runner.py | 9 +- aria/orchestrator/workflows/core/engine.py | 9 +- .../workflows/core/events_handler.py | 1 + tests/orchestrator/test_workflow_runner.py | 89 ++++++++++++++++++-- 4 files changed, 87 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/79dc7e75/aria/orchestrator/workflow_runner.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py index 2bd3043..a85e7d3 100644 --- a/aria/orchestrator/workflow_runner.py +++ b/aria/orchestrator/workflow_runner.py @@ -38,7 +38,7 @@ DEFAULT_TASK_RETRY_INTERVAL = 30 class WorkflowRunner(object): def __init__(self, model_storage, resource_storage, plugin_manager, - execution_id=None, retry_failed=False, + execution_id=None, retry_failed_tasks=False, service_id=None, workflow_name=None, inputs=None, executor=None, task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS, task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL): @@ -63,7 +63,7 @@ class WorkflowRunner(object): "and service id with inputs") self._is_resume = execution_id is not None - self._retry_failed = retry_failed + self._retry_failed_tasks = retry_failed_tasks self._model_storage = model_storage self._resource_storage = resource_storage @@ -118,8 +118,9 @@ class WorkflowRunner(object): return self._model_storage.service.get(self._service_id) def execute(self): - self._engine.execute( - ctx=self._workflow_context, resuming=self._is_resume, retry_failed=self._retry_failed) + self._engine.execute(ctx=self._workflow_context, + resuming=self._is_resume, + retry_failed=self._retry_failed_tasks) def cancel(self): self._engine.cancel_execution(ctx=self._workflow_context) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/79dc7e75/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py index 69505fc..0ec3cd8 100644 --- a/aria/orchestrator/workflows/core/engine.py +++ b/aria/orchestrator/workflows/core/engine.py @@ -69,15 +69,8 @@ class Engine(logger.LoggerMixin): if cancel: self._terminate_tasks(tasks_tracker.executing_tasks) events.on_cancelled_workflow_signal.send(ctx) - elif all(task.status == task.SUCCESS or task.ignore_failure - for task in ctx.execution.tasks): - events.on_success_workflow_signal.send(ctx) else: - exception = "Tasks {tasks} remain failed".format( - tasks= - [t for t in ctx.execution.tasks if t.status == t.SUCCESS or t.ignore_failure] - ) - events.on_failure_workflow_signal.send(ctx, exception=exception) + events.on_success_workflow_signal.send(ctx) except BaseException as e: # Cleanup any remaining tasks self._terminate_tasks(tasks_tracker.executing_tasks) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/79dc7e75/aria/orchestrator/workflows/core/events_handler.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py index 5ac1ce8..219d2df 100644 --- a/aria/orchestrator/workflows/core/events_handler.py +++ b/aria/orchestrator/workflows/core/events_handler.py @@ -71,6 +71,7 @@ def _task_succeeded(ctx, *args, **kwargs): with ctx.persist_changes: ctx.task.ended_at = datetime.utcnow() ctx.task.status = ctx.task.SUCCESS + ctx.task.attempts_count += 1 _update_node_state_if_necessary(ctx) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/79dc7e75/tests/orchestrator/test_workflow_runner.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py index adb19e6..30176ae 100644 --- a/tests/orchestrator/test_workflow_runner.py +++ b/tests/orchestrator/test_workflow_runner.py @@ -359,7 +359,7 @@ class TestResumableWorkflows(object): def test_resume_workflow(self, workflow_context, thread_executor): node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) node.attributes['invocations'] = models.Attribute.wrap('invocations', 0) - self._create_interface(workflow_context, node, mock_failed_task) + self._create_interface(workflow_context, node, mock_pass_first_task_only) wf_runner = self._create_initial_workflow_runner( workflow_context, mock_parallel_tasks_workflow, thread_executor, @@ -491,13 +491,13 @@ class TestResumableWorkflows(object): def test_resume_failed_task_and_successful_task(self, workflow_context, thread_executor): node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) node.attributes['invocations'] = models.Attribute.wrap('invocations', 0) - self._create_interface(workflow_context, node, mock_failed_task) + self._create_interface(workflow_context, node, mock_pass_first_task_only) wf_runner = self._create_initial_workflow_runner( workflow_context, mock_parallel_tasks_workflow, thread_executor, - inputs={'retry_interval': 1, 'max_attempts': 1, 'number_of_tasks': 2} + inputs={'retry_interval': 1, 'max_attempts': 2, 'number_of_tasks': 2} ) wf_thread = Thread(target=wf_runner.execute) wf_thread.setDaemon(True) @@ -508,10 +508,12 @@ class TestResumableWorkflows(object): tasks = workflow_context.model.task.list(filters={'_stub_type': None}) node = workflow_context.model.node.refresh(node) - assert node.attributes['invocations'].value == 2 + assert node.attributes['invocations'].value == 3 + failed_task = [t for t in tasks if t.status == t.FAILED][0] # First task passes assert any(task.status == task.FAILED for task in tasks) + assert failed_task.attempts_count == 2 # Second task fails assert any(task.status == task.SUCCESS for task in tasks) assert wf_runner.execution.status in wf_runner.execution.FAILED @@ -521,7 +523,7 @@ class TestResumableWorkflows(object): try: new_wf_runner = WorkflowRunner( service_id=wf_runner.service.id, - retry_failed=True, + retry_failed_tasks=True, inputs={}, model_storage=workflow_context.model, resource_storage=workflow_context.resource, @@ -535,10 +537,60 @@ class TestResumableWorkflows(object): # Wait for it to finish and assert changes. node = workflow_context.model.node.refresh(node) - assert node.attributes['invocations'].value == 3 + assert failed_task.attempts_count == 1 + assert node.attributes['invocations'].value == 4 assert all(task.status == task.SUCCESS for task in tasks) assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED + def test_two_sequential_task_first_task_failed(self, workflow_context, thread_executor): + node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) + node.attributes['invocations'] = models.Attribute.wrap('invocations', 0) + self._create_interface(workflow_context, node, mock_fail_first_task_only) + + wf_runner = self._create_initial_workflow_runner( + workflow_context, + mock_sequential_tasks_workflow, + thread_executor, + inputs={'retry_interval': 1, 'max_attempts': 1, 'number_of_tasks': 2} + ) + wf_thread = Thread(target=wf_runner.execute) + wf_thread.setDaemon(True) + wf_thread.start() + + if custom_events['execution_failed'].wait(60) is False: + raise TimeoutError("Execution did not end") + + tasks = workflow_context.model.task.list(filters={'_stub_type': None}) + node = workflow_context.model.node.refresh(node) + assert node.attributes['invocations'].value == 1 + assert any(t.status == t.FAILED for t in tasks) + assert any(t.status == t.PENDING for t in tasks) + + custom_events['is_resumed'].set() + new_thread_executor = thread.ThreadExecutor() + try: + new_wf_runner = WorkflowRunner( + service_id=wf_runner.service.id, + inputs={}, + model_storage=workflow_context.model, + resource_storage=workflow_context.resource, + plugin_manager=None, + execution_id=wf_runner.execution.id, + executor=new_thread_executor) + + new_wf_runner.execute() + finally: + new_thread_executor.close() + + # Wait for it to finish and assert changes. + node = workflow_context.model.node.refresh(node) + assert node.attributes['invocations'].value == 2 + assert any(t.status == t.SUCCESS for t in tasks) + assert any(t.status == t.FAILED for t in tasks) + assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED + + + @staticmethod @pytest.fixture def thread_executor(): @@ -598,10 +650,21 @@ class TestResumableWorkflows(object): @workflow +def mock_sequential_tasks_workflow(ctx, graph, + retry_interval=1, max_attempts=10, number_of_tasks=1): + node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) + graph.sequence(*_create_tasks(node, retry_interval, max_attempts, number_of_tasks)) + + +@workflow def mock_parallel_tasks_workflow(ctx, graph, retry_interval=1, max_attempts=10, number_of_tasks=1): node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) - tasks = [ + graph.add_tasks(*_create_tasks(node, retry_interval, max_attempts, number_of_tasks)) + + +def _create_tasks(node, retry_interval, max_attempts, number_of_tasks): + return [ api.task.OperationTask(node, 'aria.interfaces.lifecycle', 'create', @@ -609,7 +672,7 @@ def mock_parallel_tasks_workflow(ctx, graph, max_attempts=max_attempts) for _ in xrange(number_of_tasks) ] - graph.add_tasks(*tasks) + @operation @@ -644,7 +707,7 @@ def mock_stuck_task(ctx): @operation -def mock_failed_task(ctx): +def mock_pass_first_task_only(ctx): ctx.node.attributes['invocations'] += 1 if ctx.node.attributes['invocations'] != 1: @@ -653,3 +716,11 @@ def mock_failed_task(ctx): # if resume was called, increase by one. o/w fail the execution - second task should # fail as long it was not a part of resuming the workflow raise FailingTask("wasn't resumed yet") + + +@operation +def mock_fail_first_task_only(ctx): + ctx.node.attributes['invocations'] += 1 + + if not custom_events['is_resumed'].isSet() and ctx.node.attributes['invocations'] == 1: + raise FailingTask("First task should fail")
