ARIA-20 Add support for ignoring task failures

Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/b9c135f8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/b9c135f8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/b9c135f8

Branch: refs/heads/ARIA-18-migrate-tosca-parser
Commit: b9c135f86feda60c318ae585a1ec4aa0c22a31d8
Parents: eb3dd5d
Author: Dan Kilman <[email protected]>
Authored: Mon Nov 14 13:24:13 2016 +0200
Committer: Dan Kilman <[email protected]>
Committed: Mon Nov 14 13:26:41 2016 +0200

----------------------------------------------------------------------
 aria/context/workflow.py             |  2 ++
 aria/events/builtin_event_handler.py |  8 +++++++-
 aria/storage/models.py               |  1 +
 aria/workflows/api/task.py           |  3 +++
 aria/workflows/core/engine.py        |  2 +-
 aria/workflows/core/task.py          |  1 +
 tests/mock/context.py                |  6 ++++--
 tests/workflows/api/test_task.py     |  8 ++++++--
 tests/workflows/core/test_engine.py  | 27 +++++++++++++++++++++++++--
 9 files changed, 50 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b9c135f8/aria/context/workflow.py
----------------------------------------------------------------------
diff --git a/aria/context/workflow.py b/aria/context/workflow.py
index fb9c8ee..b84a5fb 100644
--- a/aria/context/workflow.py
+++ b/aria/context/workflow.py
@@ -49,6 +49,7 @@ class WorkflowContext(logger.LoggerMixin):
             parameters=None,
             task_max_attempts=1,
             task_retry_interval=0,
+            task_ignore_failure=False,
             **kwargs):
         super(WorkflowContext, self).__init__(**kwargs)
         self.name = name
@@ -61,6 +62,7 @@ class WorkflowContext(logger.LoggerMixin):
         self.parameters = parameters or {}
         self.task_max_attempts = task_max_attempts
         self.task_retry_interval = task_retry_interval
+        self.task_ignore_failure = task_ignore_failure
         # TODO: execution creation should happen somewhere else
         # should be moved there, when such logical place exists
         try:

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b9c135f8/aria/events/builtin_event_handler.py
----------------------------------------------------------------------
diff --git a/aria/events/builtin_event_handler.py 
b/aria/events/builtin_event_handler.py
index cba3180..2f9a3be 100644
--- a/aria/events/builtin_event_handler.py
+++ b/aria/events/builtin_event_handler.py
@@ -55,7 +55,13 @@ def _task_started(task, *args, **kwargs):
 @on_failure_task_signal.connect
 def _task_failed(task, *args, **kwargs):
     with task.update():
-        if task.retry_count < task.max_attempts - 1 or task.max_attempts == 
task.INFINITE_RETRIES:
+        should_retry = (
+            (task.retry_count < task.max_attempts - 1 or
+             task.max_attempts == task.INFINITE_RETRIES) and
+            # ignore_failure check here means the task will not be retries and 
it will be marked as
+            # failed. The engine will also look at ignore_failure so it won't 
fail the workflow.
+            not task.ignore_failure)
+        if should_retry:
             task.status = task.RETRYING
             task.retry_count += 1
             task.due_at = datetime.utcnow() + 
timedelta(seconds=task.retry_interval)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b9c135f8/aria/storage/models.py
----------------------------------------------------------------------
diff --git a/aria/storage/models.py b/aria/storage/models.py
index 23f2408..94a9aa0 100644
--- a/aria/storage/models.py
+++ b/aria/storage/models.py
@@ -432,6 +432,7 @@ class Task(Model):
     max_attempts = Field(type=int, default=1, 
validation_func=_Validation.validate_max_attempts)
     retry_count = Field(type=int, default=0)
     retry_interval = Field(type=(int, float), default=0)
+    ignore_failure = Field(type=bool, default=False)
 
     # Operation specific fields
     name = Field(type=basestring)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b9c135f8/aria/workflows/api/task.py
----------------------------------------------------------------------
diff --git a/aria/workflows/api/task.py b/aria/workflows/api/task.py
index bb8e045..7e9b8e0 100644
--- a/aria/workflows/api/task.py
+++ b/aria/workflows/api/task.py
@@ -60,6 +60,7 @@ class OperationTask(BaseTask):
                  node_instance,
                  max_attempts=None,
                  retry_interval=None,
+                 ignore_failure=None,
                  inputs=None):
         """
         Creates an operation task using the name, details, node instance and 
any additional kwargs.
@@ -77,6 +78,8 @@ class OperationTask(BaseTask):
                              if max_attempts is None else max_attempts)
         self.retry_interval = (self.workflow_context.task_retry_interval
                                if retry_interval is None else retry_interval)
+        self.ignore_failure = (self.workflow_context.task_ignore_failure
+                               if ignore_failure is None else ignore_failure)
 
 
 class WorkflowTask(BaseTask):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b9c135f8/aria/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/workflows/core/engine.py b/aria/workflows/core/engine.py
index b32ef11..83ad097 100644
--- a/aria/workflows/core/engine.py
+++ b/aria/workflows/core/engine.py
@@ -108,7 +108,7 @@ class Engine(logger.LoggerMixin):
             self._executor.execute(task)
 
     def _handle_ended_tasks(self, task):
-        if task.status == models.Task.FAILED:
+        if task.status == models.Task.FAILED and not task.ignore_failure:
             raise exceptions.ExecutorException('Workflow failed')
         else:
             self._execution_graph.remove_node(task.id)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b9c135f8/aria/workflows/core/task.py
----------------------------------------------------------------------
diff --git a/aria/workflows/core/task.py b/aria/workflows/core/task.py
index b90306a..65a8ddb 100644
--- a/aria/workflows/core/task.py
+++ b/aria/workflows/core/task.py
@@ -98,6 +98,7 @@ class OperationTask(BaseTask, logger.LoggerMixin):
             execution_id=self.workflow_context.execution_id,
             max_attempts=api_task.max_attempts,
             retry_interval=api_task.retry_interval,
+            ignore_failure=api_task.ignore_failure
         )
         self.workflow_context.model.task.store(task)
         self._task_id = task.id

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b9c135f8/tests/mock/context.py
----------------------------------------------------------------------
diff --git a/tests/mock/context.py b/tests/mock/context.py
index 4d218b5..15dbc33 100644
--- a/tests/mock/context.py
+++ b/tests/mock/context.py
@@ -19,11 +19,11 @@ from . import models
 from ..storage import InMemoryModelDriver
 
 
-def simple():
+def simple(**kwargs):
     storage = application_model_storage(InMemoryModelDriver())
     storage.setup()
     storage.deployment.store(models.get_deployment())
-    return context.workflow.WorkflowContext(
+    final_kwargs = dict(
         name='simple_context',
         model_storage=storage,
         resource_storage=None,
@@ -33,3 +33,5 @@ def simple():
         task_max_attempts=models.TASK_MAX_ATTEMPTS,
         task_retry_interval=models.TASK_RETRY_INTERVAL
     )
+    final_kwargs.update(kwargs)
+    return context.workflow.WorkflowContext(**final_kwargs)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b9c135f8/tests/workflows/api/test_task.py
----------------------------------------------------------------------
diff --git a/tests/workflows/api/test_task.py b/tests/workflows/api/test_task.py
index 857f3bf..50bca45 100644
--- a/tests/workflows/api/test_task.py
+++ b/tests/workflows/api/test_task.py
@@ -69,6 +69,7 @@ class TestOperationTask(object):
         inputs = {'inputs': True}
         max_attempts = 10
         retry_interval = 10
+        ignore_failure = True
 
         with context.workflow.current.push(workflow_context):
             model_task = api.task.OperationTask(name=name,
@@ -76,7 +77,8 @@ class TestOperationTask(object):
                                                 node_instance=node_instance,
                                                 inputs=inputs,
                                                 max_attempts=max_attempts,
-                                                retry_interval=retry_interval)
+                                                retry_interval=retry_interval,
+                                                ignore_failure=ignore_failure)
 
         assert model_task.name == name
         assert model_task.operation_details == op_details
@@ -84,9 +86,10 @@ class TestOperationTask(object):
         assert model_task.inputs == inputs
         assert model_task.retry_interval == retry_interval
         assert model_task.max_attempts == max_attempts
+        assert model_task.ignore_failure == ignore_failure
 
     def test_operation_task_default_values(self):
-        workflow_context = mock.context.simple()
+        workflow_context = mock.context.simple(task_ignore_failure=True)
         with context.workflow.current.push(workflow_context):
             model_task = api.task.OperationTask(
                 name='stub',
@@ -96,6 +99,7 @@ class TestOperationTask(object):
         assert model_task.inputs == {}
         assert model_task.retry_interval == 
workflow_context.task_retry_interval
         assert model_task.max_attempts == workflow_context.task_max_attempts
+        assert model_task.ignore_failure == 
workflow_context.task_ignore_failure
 
 
 class TestWorkflowTask(object):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b9c135f8/tests/workflows/core/test_engine.py
----------------------------------------------------------------------
diff --git a/tests/workflows/core/test_engine.py 
b/tests/workflows/core/test_engine.py
index 744e155..6377a69 100644
--- a/tests/workflows/core/test_engine.py
+++ b/tests/workflows/core/test_engine.py
@@ -54,7 +54,11 @@ class BaseTest(object):
                              tasks_graph=graph)
 
     @staticmethod
-    def _op(func, ctx, inputs=None, max_attempts=None, retry_interval=None):
+    def _op(func, ctx,
+            inputs=None,
+            max_attempts=None,
+            retry_interval=None,
+            ignore_failure=None):
         return api.task.OperationTask(
             name='task',
             operation_details={'operation': 
'tests.workflows.core.test_engine.{name}'.format(
@@ -62,7 +66,8 @@ class BaseTest(object):
             
node_instance=ctx.model.node_instance.get('dependency_node_instance'),
             inputs=inputs,
             max_attempts=max_attempts,
-            retry_interval=retry_interval
+            retry_interval=retry_interval,
+            ignore_failure=ignore_failure
         )
 
     @pytest.fixture(scope='function', autouse=True)
@@ -372,6 +377,24 @@ class TestRetries(BaseTest):
         assert invocation2 - invocation1 >= retry_interval
         assert global_test_holder.get('sent_task_signal_calls') == 2
 
+    def test_ignore_failure(self, workflow_context, executor):
+        @workflow
+        def mock_workflow(ctx, graph):
+            op = self._op(mock_conditional_failure_task, ctx,
+                          ignore_failure=True,
+                          inputs={'failure_count': 100},
+                          max_attempts=100)
+            graph.add_tasks(op)
+        self._execute(
+            workflow_func=mock_workflow,
+            workflow_context=workflow_context,
+            executor=executor)
+        assert workflow_context.states == ['start', 'success']
+        assert workflow_context.exception is None
+        invocations = global_test_holder.get('invocations', [])
+        assert len(invocations) == 1
+        assert global_test_holder.get('sent_task_signal_calls') == 1
+
 
 def mock_success_task():
     pass

Reply via email to