Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-237-Support-for-resuming-failed-workflow-executions 691334ef9 -> cd91d0124
wip Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/cd91d012 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/cd91d012 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/cd91d012 Branch: refs/heads/ARIA-237-Support-for-resuming-failed-workflow-executions Commit: cd91d0124d3c51dafaa394a404150a52d9b7b22f Parents: 691334e Author: max-orlov <[email protected]> Authored: Tue Jul 4 18:43:35 2017 +0300 Committer: max-orlov <[email protected]> Committed: Tue Jul 4 18:43:35 2017 +0300 ---------------------------------------------------------------------- aria/orchestrator/workflows/core/engine.py | 10 ++--- tests/orchestrator/test_workflow_runner.py | 53 ++++++++++++++----------- 2 files changed, 35 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/cd91d012/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py index 151c3ff..cb98b82 100644 --- a/aria/orchestrator/workflows/core/engine.py +++ b/aria/orchestrator/workflows/core/engine.py @@ -52,12 +52,12 @@ class Engine(logger.LoggerMixin): ctx, # only succeeded tasks have ended (for retry failing) has_task_ended_func= - lambda t: (t.status == t.SUCCESS or (t.status == t.FAILED and t.ignore_failure)) + (lambda t: (t.status == t.SUCCESS or (t.status == t.FAILED and t.ignore_failure))) if retry_failing else None, # failed tasks are waiting to be executed (for retry failing) is_task_waiting_func= - lambda t: (t.is_waiting() or (t.FAILED and not t.ignore_failure)) + (lambda t: t.is_waiting()) if retry_failing else None ) @@ -140,10 +140,10 @@ class _TasksTracker(object): def __init__(self, ctx, has_task_ended_func=None, is_task_waiting_func=None): self._ctx = ctx - has_task_ended = has_task_ended_func or (lambda t: t.has_ended()) + self._has_task_ended = has_task_ended_func or (lambda t: t.has_ended()) self._is_task_waiting = is_task_waiting_func or (lambda t: t.is_waiting()) self._tasks = ctx.execution.tasks - self._executed_tasks = [task for task in self._tasks if has_task_ended(task)] + self._executed_tasks = [task for task in self._tasks if self._has_task_ended(task)] self._executable_tasks = list(set(self._tasks) - set(self._executed_tasks)) self._executing_tasks = [] @@ -164,7 +164,7 @@ class _TasksTracker(object): @property def ended_tasks(self): for task in self.executing_tasks: - if task.has_ended(): + if self._has_task_ended(task): yield task @property http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/cd91d012/tests/orchestrator/test_workflow_runner.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py index adeb274..d3fdeae 100644 --- a/tests/orchestrator/test_workflow_runner.py +++ b/tests/orchestrator/test_workflow_runner.py @@ -14,6 +14,7 @@ # limitations under the License. import json +import time from threading import Thread, Event from datetime import datetime @@ -389,7 +390,6 @@ class TestResumableWorkflows(object): def test_resume_failed_workflow(self, workflow_context, executor): node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) - node.attributes['invocations'] = models.Attribute.wrap('invocations', 0) self._create_interface(workflow_context, node, mock_failed_first_task) wf_runner = self._create_initial_workflow_runner( @@ -398,14 +398,17 @@ class TestResumableWorkflows(object): wf_thread.daemon = True wf_thread.start() - if custom_events['execution_ended'].wait(60) is False: + if custom_events['is_active'].wait(60) is False: + raise TimeoutError("Execution did not end") + wf_runner.cancel() + if custom_events['execution_cancelled'].wait(60) is False: raise TimeoutError("Execution did not end") - assert node.attributes['invocations'].value == 1 - tasks = workflow_context.model.task.list(filters={'_stub_type': None}) - assert any(task.status == task.SUCCESS for task in tasks) - assert any(task.status == task.FAILED for task in tasks) - assert wf_runner.execution.status == wf_runner.execution.FAILED + task = workflow_context.model.task.list(filters={'_stub_type': None})[0] + assert task.attempts_count == 2 + assert task.status in (task.STARTED, task.RETRYING) + assert wf_runner.execution.status in (wf_runner.execution.CANCELLED, + wf_runner.execution.CANCELLING) custom_events['is_resumed'].set() @@ -424,10 +427,9 @@ class TestResumableWorkflows(object): new_wf_runner.execute() # Wait for it to finish and assert changes. - print node.attributes['triggerer'].value - assert node.attributes['invocations'].value == 2 - assert all(task.status == task.SUCCESS for task in tasks) - assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED + assert node.attributes['invocations'].value == workflow_context._task_max_attempts + assert task.status == task.FAILED + assert wf_runner.execution.status == wf_runner.execution.FAILED @staticmethod @pytest.fixture @@ -516,20 +518,25 @@ def mock_sequential_workflow(ctx, graph): graph.sequence( api.task.OperationTask(node, interface_name='aria.interfaces.lifecycle', - operation_name='create'), - api.task.OperationTask(node, - interface_name='aria.interfaces.lifecycle', - operation_name='create') + operation_name='create', + retry_interval=1, + max_attempts=5), ) @operation def mock_failed_first_task(ctx): - if not custom_events['is_resumed'].isSet(): - if ctx.node.attributes['invocations'] == 1: - # Second task should fail (an be retried later) - ctx.task.abort() - ctx.node.attributes['invocations'] += 1 - if 'triggerer' not in ctx.node.attributes: - ctx.node.attributes['triggerer'] = [] - ctx.node.attributes['triggerer'].append('{0}:{1}'.format(ctx.task.id, ctx.node.attributes['invocations'])) + """ + The task runs for 10 times. then it sleeps waiting for cancellation. upon resume + :param ctx: + :return: + """ + + if ctx.task.attempts_count == 2: + custom_events['is_active'].set() + while True: + pass + + if not custom_events['is_resumed'].set(): + raise BaseException("stop this task") +
