Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-237-Support-for-resuming-failed-workflow-executions 53dc64e4c -> 9eca32a84
wip Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/9eca32a8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/9eca32a8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/9eca32a8 Branch: refs/heads/ARIA-237-Support-for-resuming-failed-workflow-executions Commit: 9eca32a842ae817e1df4ed78236341822e9715f8 Parents: 53dc64e Author: max-orlov <[email protected]> Authored: Sun Jul 2 21:43:43 2017 +0300 Committer: max-orlov <[email protected]> Committed: Sun Jul 2 21:43:43 2017 +0300 ---------------------------------------------------------------------- aria/modeling/orchestration.py | 4 +- aria/orchestrator/workflow_runner.py | 7 +- aria/orchestrator/workflows/core/engine.py | 28 ++++-- tests/orchestrator/test_workflow_runner.py | 121 ++++++++++++++++++++---- 4 files changed, 134 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9eca32a8/aria/modeling/orchestration.py ---------------------------------------------------------------------- diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py index 829c305..43557a8 100644 --- a/aria/modeling/orchestration.py +++ b/aria/modeling/orchestration.py @@ -69,7 +69,9 @@ class ExecutionBase(mixins.ModelMixin): PENDING: (STARTED, CANCELLED), STARTED: END_STATES + (CANCELLING,), CANCELLING: END_STATES, - CANCELLED: PENDING + # Retrying + CANCELLED: PENDING, + FAILED: PENDING } @orm.validates('status') http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9eca32a8/aria/orchestrator/workflow_runner.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py index 3d58386..a1c7782 100644 --- a/aria/orchestrator/workflow_runner.py +++ b/aria/orchestrator/workflow_runner.py @@ -38,7 +38,8 @@ DEFAULT_TASK_RETRY_INTERVAL = 30 class WorkflowRunner(object): def __init__(self, model_storage, resource_storage, plugin_manager, - execution_id=None, service_id=None, workflow_name=None, inputs=None, executor=None, + execution_id=None, retry_failed=False, + service_id=None, workflow_name=None, inputs=None, executor=None, task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS, task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL): """ @@ -61,6 +62,7 @@ class WorkflowRunner(object): "and service id with inputs") self._is_resume = execution_id is not None + self._retry_failed = retry_failed self._model_storage = model_storage self._resource_storage = resource_storage @@ -115,7 +117,8 @@ class WorkflowRunner(object): return self._model_storage.service.get(self._service_id) def execute(self): - self._engine.execute(ctx=self._workflow_context, resuming=self._is_resume) + self._engine.execute( + ctx=self._workflow_context, resuming=self._is_resume, retry_failing=self._retry_failed) def cancel(self): self._engine.cancel_execution(ctx=self._workflow_context) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9eca32a8/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py index 5a94df8..c045437 100644 --- a/aria/orchestrator/workflows/core/engine.py +++ b/aria/orchestrator/workflows/core/engine.py @@ -41,14 +41,27 @@ class Engine(logger.LoggerMixin): self._executors = executors.copy() self._executors.setdefault(StubTaskExecutor, StubTaskExecutor()) - def execute(self, ctx, resuming=False): + def execute(self, ctx, resuming=False, retry_failing=False): """ execute the workflow """ if resuming: events.on_resume_workflow_signal.send(ctx) - tasks_tracker = _TasksTracker(ctx) + tasks_tracker = _TasksTracker( + ctx, + # only succeeded tasks have ended (for retry failing) + has_task_ended_func= + lambda t: (t.status == t.SUCCESS or (t.status == t.FAILED and t.ignore_failure)) + if retry_failing else + None, + # failed tasks are waiting to be executed (for retry failing) + is_task_waiting_func= + lambda t: (t.is_waiting() or (t.FAILED and not t.ignore_failure)) + if retry_failing else + None + ) + try: events.start_workflow_signal.send(ctx) while True: @@ -124,10 +137,13 @@ class Engine(logger.LoggerMixin): class _TasksTracker(object): - def __init__(self, ctx): + def __init__(self, ctx, has_task_ended_func=None, is_task_waiting_func=None): self._ctx = ctx + + has_task_ended = has_task_ended_func or (lambda t: t.has_ended()) + self._is_task_waiting = is_task_waiting_func or (lambda t: t.is_waiting()) self._tasks = ctx.execution.tasks - self._executed_tasks = [task for task in self._tasks if task.has_ended()] + self._executed_tasks = [task for task in self._tasks if has_task_ended(task)] self._executable_tasks = list(set(self._tasks) - set(self._executed_tasks)) self._executing_tasks = [] @@ -155,8 +171,8 @@ class _TasksTracker(object): def executable_tasks(self): now = datetime.utcnow() # we need both lists since retrying task are in the executing task list. - for task in self._update_tasks(self._executing_tasks + self._executable_tasks): - if all([task.is_waiting(), + for task in self._update_tasks(set(self._executing_tasks + self._executable_tasks)): + if all([self._is_task_waiting(task), task.due_at <= now, all(dependency in self._executed_tasks for dependency in task.dependencies) ]): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9eca32a8/tests/orchestrator/test_workflow_runner.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py index e640c7d..adeb274 100644 --- a/tests/orchestrator/test_workflow_runner.py +++ b/tests/orchestrator/test_workflow_runner.py @@ -23,7 +23,7 @@ import pytest from aria.modeling import exceptions as modeling_exceptions from aria.modeling import models from aria.orchestrator import exceptions -from aria.orchestrator.events import on_cancelled_workflow_signal +from aria.orchestrator import events from aria.orchestrator.workflow_runner import WorkflowRunner from aria.orchestrator.workflows.executor.process import ProcessExecutor from aria.orchestrator.workflows import api @@ -46,9 +46,10 @@ from ..fixtures import ( # pylint: disable=unused-import resource_storage as resource ) -events = { +custom_events = { 'is_resumed': Event(), 'is_active': Event(), + 'execution_cancelled': Event(), 'execution_ended': Event() } @@ -318,42 +319,53 @@ def _create_workflow_runner(request, workflow_name, inputs=None, executor=None, class TestResumableWorkflows(object): - def test_resume_workflow(self, workflow_context, executor): - node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) - node.attributes['invocations'] = models.Attribute.wrap('invocations', 0) - self._create_interface(workflow_context, node, mock_resuming_task) + def _create_initial_workflow_runner( + self, workflow_context, workflow, executor, inputs=None): service = workflow_context.service service.workflows['custom_workflow'] = tests_mock.models.create_operation( 'custom_workflow', - operation_kwargs={'function': '{0}.{1}'.format(__name__, mock_workflow.__name__)} + operation_kwargs={ + 'function': '{0}.{1}'.format(__name__, workflow.__name__), + 'inputs': dict((k, models.Input.wrap(k, v)) for k, v in (inputs or {}).items()) + } ) workflow_context.model.service.update(service) wf_runner = WorkflowRunner( service_id=workflow_context.service.id, - inputs={}, + inputs=inputs or {}, model_storage=workflow_context.model, resource_storage=workflow_context.resource, plugin_manager=None, workflow_name='custom_workflow', executor=executor) + return wf_runner + + def test_resume_workflow(self, workflow_context, executor): + node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) + node.attributes['invocations'] = models.Attribute.wrap('invocations', 0) + self._create_interface(workflow_context, node, mock_resuming_task) + + wf_runner = self._create_initial_workflow_runner( + workflow_context, mock_parallel_workflow, executor) + wf_thread = Thread(target=wf_runner.execute) wf_thread.daemon = True wf_thread.start() # Wait for the execution to start - if events['is_active'].wait(5) is False: + if custom_events['is_active'].wait(5) is False: raise TimeoutError("is_active wasn't set to True") wf_runner.cancel() - if events['execution_ended'].wait(60) is False: + if custom_events['execution_cancelled'].wait(60) is False: raise TimeoutError("Execution did not end") tasks = workflow_context.model.task.list(filters={'_stub_type': None}) assert any(task.status == task.SUCCESS for task in tasks) assert any(task.status in (task.FAILED, task.RETRYING) for task in tasks) - events['is_resumed'].set() + custom_events['is_resumed'].set() assert any(task.status in (task.FAILED, task.RETRYING) for task in tasks) # Create a new workflow runner, with an existing execution id. This would cause @@ -374,6 +386,49 @@ class TestResumableWorkflows(object): assert node.attributes['invocations'].value == 3 assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED + def test_resume_failed_workflow(self, workflow_context, executor): + + node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) + node.attributes['invocations'] = models.Attribute.wrap('invocations', 0) + self._create_interface(workflow_context, node, mock_failed_first_task) + + wf_runner = self._create_initial_workflow_runner( + workflow_context, mock_sequential_workflow, executor) + wf_thread = Thread(target=wf_runner.execute) + wf_thread.daemon = True + wf_thread.start() + + if custom_events['execution_ended'].wait(60) is False: + raise TimeoutError("Execution did not end") + + assert node.attributes['invocations'].value == 1 + tasks = workflow_context.model.task.list(filters={'_stub_type': None}) + assert any(task.status == task.SUCCESS for task in tasks) + assert any(task.status == task.FAILED for task in tasks) + assert wf_runner.execution.status == wf_runner.execution.FAILED + + custom_events['is_resumed'].set() + + # Create a new workflow runner, with an existing execution id. This would cause + # the old execution to restart. + new_wf_runner = WorkflowRunner( + service_id=wf_runner.service.id, + retry_failed=True, + inputs={}, + model_storage=workflow_context.model, + resource_storage=workflow_context.resource, + plugin_manager=None, + execution_id=wf_runner.execution.id, + executor=executor) + + new_wf_runner.execute() + + # Wait for it to finish and assert changes. + print node.attributes['triggerer'].value + assert node.attributes['invocations'].value == 2 + assert all(task.status == task.SUCCESS for task in tasks) + assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED + @staticmethod @pytest.fixture def executor(): @@ -417,16 +472,23 @@ class TestResumableWorkflows(object): @pytest.fixture(autouse=True) def register_to_events(self): + def execution_cancelled(*args, **kwargs): + custom_events['execution_cancelled'].set() + def execution_ended(*args, **kwargs): - events['execution_ended'].set() + custom_events['execution_ended'].set() - on_cancelled_workflow_signal.connect(execution_ended) + events.on_cancelled_workflow_signal.connect(execution_cancelled) + events.on_failure_workflow_signal.connect(execution_ended) yield - on_cancelled_workflow_signal.disconnect(execution_ended) + events.on_cancelled_workflow_signal.disconnect(execution_cancelled) + events.on_failure_workflow_signal.disconnect(execution_ended) + for event in custom_events.values(): + event.clear() @workflow -def mock_workflow(ctx, graph): +def mock_parallel_workflow(ctx, graph): node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) graph.add_tasks( api.task.OperationTask( @@ -441,8 +503,33 @@ def mock_resuming_task(ctx): ctx.node.attributes['invocations'] += 1 if ctx.node.attributes['invocations'] != 1: - events['is_active'].set() - if not events['is_resumed'].isSet(): + custom_events['is_active'].set() + if not custom_events['is_resumed'].isSet(): # if resume was called, increase by one. o/w fail the execution - second task should # fail as long it was not a part of resuming the workflow raise BaseException("wasn't resumed yet") + + +@workflow +def mock_sequential_workflow(ctx, graph): + node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) + graph.sequence( + api.task.OperationTask(node, + interface_name='aria.interfaces.lifecycle', + operation_name='create'), + api.task.OperationTask(node, + interface_name='aria.interfaces.lifecycle', + operation_name='create') + ) + + +@operation +def mock_failed_first_task(ctx): + if not custom_events['is_resumed'].isSet(): + if ctx.node.attributes['invocations'] == 1: + # Second task should fail (an be retried later) + ctx.task.abort() + ctx.node.attributes['invocations'] += 1 + if 'triggerer' not in ctx.node.attributes: + ctx.node.attributes['triggerer'] = [] + ctx.node.attributes['triggerer'].append('{0}:{1}'.format(ctx.task.id, ctx.node.attributes['invocations']))
