Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-20-ignore-task-failure e10cc1044 -> b9c135f86 (forced update)
ARIA-20 Add support for ignoring task failures Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/b9c135f8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/b9c135f8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/b9c135f8 Branch: refs/heads/ARIA-20-ignore-task-failure Commit: b9c135f86feda60c318ae585a1ec4aa0c22a31d8 Parents: eb3dd5d Author: Dan Kilman <[email protected]> Authored: Mon Nov 14 13:24:13 2016 +0200 Committer: Dan Kilman <[email protected]> Committed: Mon Nov 14 13:26:41 2016 +0200 ---------------------------------------------------------------------- aria/context/workflow.py | 2 ++ aria/events/builtin_event_handler.py | 8 +++++++- aria/storage/models.py | 1 + aria/workflows/api/task.py | 3 +++ aria/workflows/core/engine.py | 2 +- aria/workflows/core/task.py | 1 + tests/mock/context.py | 6 ++++-- tests/workflows/api/test_task.py | 8 ++++++-- tests/workflows/core/test_engine.py | 27 +++++++++++++++++++++++++-- 9 files changed, 50 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b9c135f8/aria/context/workflow.py ---------------------------------------------------------------------- diff --git a/aria/context/workflow.py b/aria/context/workflow.py index fb9c8ee..b84a5fb 100644 --- a/aria/context/workflow.py +++ b/aria/context/workflow.py @@ -49,6 +49,7 @@ class WorkflowContext(logger.LoggerMixin): parameters=None, task_max_attempts=1, task_retry_interval=0, + task_ignore_failure=False, **kwargs): super(WorkflowContext, self).__init__(**kwargs) self.name = name @@ -61,6 +62,7 @@ class WorkflowContext(logger.LoggerMixin): self.parameters = parameters or {} self.task_max_attempts = task_max_attempts self.task_retry_interval = task_retry_interval + self.task_ignore_failure = task_ignore_failure # TODO: execution creation should happen somewhere else # should be moved there, when such logical place exists try: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b9c135f8/aria/events/builtin_event_handler.py ---------------------------------------------------------------------- diff --git a/aria/events/builtin_event_handler.py b/aria/events/builtin_event_handler.py index cba3180..2f9a3be 100644 --- a/aria/events/builtin_event_handler.py +++ b/aria/events/builtin_event_handler.py @@ -55,7 +55,13 @@ def _task_started(task, *args, **kwargs): @on_failure_task_signal.connect def _task_failed(task, *args, **kwargs): with task.update(): - if task.retry_count < task.max_attempts - 1 or task.max_attempts == task.INFINITE_RETRIES: + should_retry = ( + (task.retry_count < task.max_attempts - 1 or + task.max_attempts == task.INFINITE_RETRIES) and + # ignore_failure check here means the task will not be retries and it will be marked as + # failed. The engine will also look at ignore_failure so it won't fail the workflow. + not task.ignore_failure) + if should_retry: task.status = task.RETRYING task.retry_count += 1 task.due_at = datetime.utcnow() + timedelta(seconds=task.retry_interval) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b9c135f8/aria/storage/models.py ---------------------------------------------------------------------- diff --git a/aria/storage/models.py b/aria/storage/models.py index 23f2408..94a9aa0 100644 --- a/aria/storage/models.py +++ b/aria/storage/models.py @@ -432,6 +432,7 @@ class Task(Model): max_attempts = Field(type=int, default=1, validation_func=_Validation.validate_max_attempts) retry_count = Field(type=int, default=0) retry_interval = Field(type=(int, float), default=0) + ignore_failure = Field(type=bool, default=False) # Operation specific fields name = Field(type=basestring) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b9c135f8/aria/workflows/api/task.py ---------------------------------------------------------------------- diff --git a/aria/workflows/api/task.py b/aria/workflows/api/task.py index bb8e045..7e9b8e0 100644 --- a/aria/workflows/api/task.py +++ b/aria/workflows/api/task.py @@ -60,6 +60,7 @@ class OperationTask(BaseTask): node_instance, max_attempts=None, retry_interval=None, + ignore_failure=None, inputs=None): """ Creates an operation task using the name, details, node instance and any additional kwargs. @@ -77,6 +78,8 @@ class OperationTask(BaseTask): if max_attempts is None else max_attempts) self.retry_interval = (self.workflow_context.task_retry_interval if retry_interval is None else retry_interval) + self.ignore_failure = (self.workflow_context.task_ignore_failure + if ignore_failure is None else ignore_failure) class WorkflowTask(BaseTask): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b9c135f8/aria/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/workflows/core/engine.py b/aria/workflows/core/engine.py index b32ef11..83ad097 100644 --- a/aria/workflows/core/engine.py +++ b/aria/workflows/core/engine.py @@ -108,7 +108,7 @@ class Engine(logger.LoggerMixin): self._executor.execute(task) def _handle_ended_tasks(self, task): - if task.status == models.Task.FAILED: + if task.status == models.Task.FAILED and not task.ignore_failure: raise exceptions.ExecutorException('Workflow failed') else: self._execution_graph.remove_node(task.id) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b9c135f8/aria/workflows/core/task.py ---------------------------------------------------------------------- diff --git a/aria/workflows/core/task.py b/aria/workflows/core/task.py index b90306a..65a8ddb 100644 --- a/aria/workflows/core/task.py +++ b/aria/workflows/core/task.py @@ -98,6 +98,7 @@ class OperationTask(BaseTask, logger.LoggerMixin): execution_id=self.workflow_context.execution_id, max_attempts=api_task.max_attempts, retry_interval=api_task.retry_interval, + ignore_failure=api_task.ignore_failure ) self.workflow_context.model.task.store(task) self._task_id = task.id http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b9c135f8/tests/mock/context.py ---------------------------------------------------------------------- diff --git a/tests/mock/context.py b/tests/mock/context.py index 4d218b5..15dbc33 100644 --- a/tests/mock/context.py +++ b/tests/mock/context.py @@ -19,11 +19,11 @@ from . import models from ..storage import InMemoryModelDriver -def simple(): +def simple(**kwargs): storage = application_model_storage(InMemoryModelDriver()) storage.setup() storage.deployment.store(models.get_deployment()) - return context.workflow.WorkflowContext( + final_kwargs = dict( name='simple_context', model_storage=storage, resource_storage=None, @@ -33,3 +33,5 @@ def simple(): task_max_attempts=models.TASK_MAX_ATTEMPTS, task_retry_interval=models.TASK_RETRY_INTERVAL ) + final_kwargs.update(kwargs) + return context.workflow.WorkflowContext(**final_kwargs) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b9c135f8/tests/workflows/api/test_task.py ---------------------------------------------------------------------- diff --git a/tests/workflows/api/test_task.py b/tests/workflows/api/test_task.py index 857f3bf..50bca45 100644 --- a/tests/workflows/api/test_task.py +++ b/tests/workflows/api/test_task.py @@ -69,6 +69,7 @@ class TestOperationTask(object): inputs = {'inputs': True} max_attempts = 10 retry_interval = 10 + ignore_failure = True with context.workflow.current.push(workflow_context): model_task = api.task.OperationTask(name=name, @@ -76,7 +77,8 @@ class TestOperationTask(object): node_instance=node_instance, inputs=inputs, max_attempts=max_attempts, - retry_interval=retry_interval) + retry_interval=retry_interval, + ignore_failure=ignore_failure) assert model_task.name == name assert model_task.operation_details == op_details @@ -84,9 +86,10 @@ class TestOperationTask(object): assert model_task.inputs == inputs assert model_task.retry_interval == retry_interval assert model_task.max_attempts == max_attempts + assert model_task.ignore_failure == ignore_failure def test_operation_task_default_values(self): - workflow_context = mock.context.simple() + workflow_context = mock.context.simple(task_ignore_failure=True) with context.workflow.current.push(workflow_context): model_task = api.task.OperationTask( name='stub', @@ -96,6 +99,7 @@ class TestOperationTask(object): assert model_task.inputs == {} assert model_task.retry_interval == workflow_context.task_retry_interval assert model_task.max_attempts == workflow_context.task_max_attempts + assert model_task.ignore_failure == workflow_context.task_ignore_failure class TestWorkflowTask(object): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b9c135f8/tests/workflows/core/test_engine.py ---------------------------------------------------------------------- diff --git a/tests/workflows/core/test_engine.py b/tests/workflows/core/test_engine.py index 744e155..6377a69 100644 --- a/tests/workflows/core/test_engine.py +++ b/tests/workflows/core/test_engine.py @@ -54,7 +54,11 @@ class BaseTest(object): tasks_graph=graph) @staticmethod - def _op(func, ctx, inputs=None, max_attempts=None, retry_interval=None): + def _op(func, ctx, + inputs=None, + max_attempts=None, + retry_interval=None, + ignore_failure=None): return api.task.OperationTask( name='task', operation_details={'operation': 'tests.workflows.core.test_engine.{name}'.format( @@ -62,7 +66,8 @@ class BaseTest(object): node_instance=ctx.model.node_instance.get('dependency_node_instance'), inputs=inputs, max_attempts=max_attempts, - retry_interval=retry_interval + retry_interval=retry_interval, + ignore_failure=ignore_failure ) @pytest.fixture(scope='function', autouse=True) @@ -372,6 +377,24 @@ class TestRetries(BaseTest): assert invocation2 - invocation1 >= retry_interval assert global_test_holder.get('sent_task_signal_calls') == 2 + def test_ignore_failure(self, workflow_context, executor): + @workflow + def mock_workflow(ctx, graph): + op = self._op(mock_conditional_failure_task, ctx, + ignore_failure=True, + inputs={'failure_count': 100}, + max_attempts=100) + graph.add_tasks(op) + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + invocations = global_test_holder.get('invocations', []) + assert len(invocations) == 1 + assert global_test_holder.get('sent_task_signal_calls') == 1 + def mock_success_task(): pass
