Repository: incubator-ariatosca
Updated Branches:
  refs/heads/master 12e175b42 -> 8a00b5fce


ARIA-55 Implement task retry and abort mechanism


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/8a00b5fc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/8a00b5fc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/8a00b5fc

Branch: refs/heads/master
Commit: 8a00b5fce2c737058f10c1e3d3a92bdcfe53e882
Parents: 12e175b
Author: Dan Kilman <[email protected]>
Authored: Thu Jan 5 13:00:01 2017 +0200
Committer: Dan Kilman <[email protected]>
Committed: Thu Jan 5 13:21:35 2017 +0200

----------------------------------------------------------------------
 aria/orchestrator/exceptions.py                 | 16 ++++
 .../workflows/core/events_handler.py            | 25 ++++--
 aria/storage/base_model.py                      | 10 ++-
 .../orchestrator/workflows/core/test_engine.py  | 94 +++++++++++++++++++-
 .../workflows/executor/test_executor.py         |  1 +
 .../workflows/executor/test_process_executor.py |  1 +
 6 files changed, 136 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8a00b5fc/aria/orchestrator/exceptions.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/exceptions.py b/aria/orchestrator/exceptions.py
index 74e9002..bd5238e 100644
--- a/aria/orchestrator/exceptions.py
+++ b/aria/orchestrator/exceptions.py
@@ -30,3 +30,19 @@ class PluginAlreadyExistsError(AriaError):
     Raised when a plugin with the same package name and package version 
already exists
     """
     pass
+
+
+class TaskRetryException(RuntimeError):
+    """
+    Used internally when ctx.task.retry is called
+    """
+    def __init__(self, message, retry_interval):
+        super(TaskRetryException, self).__init__(message)
+        self.retry_interval = retry_interval
+
+
+class TaskAbortException(RuntimeError):
+    """
+    Used internally when ctx.task.abort is called
+    """
+    pass

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8a00b5fc/aria/orchestrator/workflows/core/events_handler.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/events_handler.py 
b/aria/orchestrator/workflows/core/events_handler.py
index d05cbcb..c973ad9 100644
--- a/aria/orchestrator/workflows/core/events_handler.py
+++ b/aria/orchestrator/workflows/core/events_handler.py
@@ -27,7 +27,7 @@ from datetime import (
 )
 
 from ... import events
-
+from ... import exceptions
 
 @events.sent_task_signal.connect
 def _task_sent(task, *args, **kwargs):
@@ -43,18 +43,25 @@ def _task_started(task, *args, **kwargs):
 
 
 @events.on_failure_task_signal.connect
-def _task_failed(task, *args, **kwargs):
+def _task_failed(task, exception, *args, **kwargs):
     with task._update():
-        should_retry = (
-            (task.retry_count < task.max_attempts - 1 or
-             task.max_attempts == task.INFINITE_RETRIES) and
-            # ignore_failure check here means the task will not be retries and 
it will be marked as
-            # failed. The engine will also look at ignore_failure so it won't 
fail the workflow.
-            not task.ignore_failure)
+        should_retry = all([
+            not isinstance(exception, exceptions.TaskAbortException),
+            task.retry_count < task.max_attempts - 1 or task.max_attempts == 
task.INFINITE_RETRIES,
+            # ignore_failure check here means the task will not be retries and 
it will be marked
+            # as failed. The engine will also look at ignore_failure so it 
won't fail the
+            # workflow.
+            not task.ignore_failure
+        ])
         if should_retry:
+            retry_interval = None
+            if isinstance(exception, exceptions.TaskRetryException):
+                retry_interval = exception.retry_interval
+            if retry_interval is None:
+                retry_interval = task.retry_interval
             task.status = task.RETRYING
             task.retry_count += 1
-            task.due_at = datetime.utcnow() + 
timedelta(seconds=task.retry_interval)
+            task.due_at = datetime.utcnow() + timedelta(seconds=retry_interval)
         else:
             task.ended_at = datetime.utcnow()
             task.status = task.FAILED

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8a00b5fc/aria/storage/base_model.py
----------------------------------------------------------------------
diff --git a/aria/storage/base_model.py b/aria/storage/base_model.py
index d1aebf2..97c541c 100644
--- a/aria/storage/base_model.py
+++ b/aria/storage/base_model.py
@@ -52,8 +52,8 @@ from sqlalchemy import (
     orm,
 )
 
+from ..orchestrator.exceptions import TaskAbortException, TaskRetryException
 from .structure import ModelMixin
-
 from .type import (
     List,
     Dict
@@ -675,3 +675,11 @@ class TaskBase(ModelMixin):
     @classmethod
     def as_relationship_instance(cls, instance, **kwargs):
         return cls(relationship_instance=instance, **kwargs)
+
+    @staticmethod
+    def abort(message=None):
+        raise TaskAbortException(message)
+
+    @staticmethod
+    def retry(message=None, retry_interval=None):
+        raise TaskRetryException(message, retry_interval=retry_interval)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8a00b5fc/tests/orchestrator/workflows/core/test_engine.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_engine.py 
b/tests/orchestrator/workflows/core/test_engine.py
index a6b55ba..d9b50a9 100644
--- a/tests/orchestrator/workflows/core/test_engine.py
+++ b/tests/orchestrator/workflows/core/test_engine.py
@@ -382,6 +382,78 @@ class TestRetries(BaseTest):
         assert global_test_holder.get('sent_task_signal_calls') == 1
 
 
+class TestTaskRetryAndAbort(BaseTest):
+    message = 'EXPECTED_ERROR'
+
+    def test_task_retry_default_interval(self, workflow_context, executor):
+        default_retry_interval = 0.1
+
+        @workflow
+        def mock_workflow(ctx, graph):
+            op = self._op(mock_task_retry, ctx,
+                          inputs={'message': self.message},
+                          retry_interval=default_retry_interval,
+                          max_attempts=2)
+            graph.add_tasks(op)
+        with pytest.raises(exceptions.ExecutorException):
+            self._execute(
+                workflow_func=mock_workflow,
+                workflow_context=workflow_context,
+                executor=executor)
+        assert workflow_context.states == ['start', 'failure']
+        assert isinstance(workflow_context.exception, 
exceptions.ExecutorException)
+        invocations = global_test_holder.get('invocations', [])
+        assert len(invocations) == 2
+        invocation1, invocation2 = invocations
+        assert invocation2 - invocation1 >= default_retry_interval
+        assert global_test_holder.get('sent_task_signal_calls') == 2
+
+    def test_task_retry_custom_interval(self, workflow_context, executor):
+        default_retry_interval = 100
+        custom_retry_interval = 0.1
+
+        @workflow
+        def mock_workflow(ctx, graph):
+            op = self._op(mock_task_retry, ctx,
+                          inputs={'message': self.message,
+                                  'retry_interval': custom_retry_interval},
+                          retry_interval=default_retry_interval,
+                          max_attempts=2)
+            graph.add_tasks(op)
+        execution_start = time.time()
+        with pytest.raises(exceptions.ExecutorException):
+            self._execute(
+                workflow_func=mock_workflow,
+                workflow_context=workflow_context,
+                executor=executor)
+        execution_end = time.time()
+        assert workflow_context.states == ['start', 'failure']
+        assert isinstance(workflow_context.exception, 
exceptions.ExecutorException)
+        invocations = global_test_holder.get('invocations', [])
+        assert len(invocations) == 2
+        assert (execution_end - execution_start) < default_retry_interval
+        assert global_test_holder.get('sent_task_signal_calls') == 2
+
+    def test_task_abort(self, workflow_context, executor):
+        @workflow
+        def mock_workflow(ctx, graph):
+            op = self._op(mock_task_abort, ctx,
+                          inputs={'message': self.message},
+                          retry_interval=100,
+                          max_attempts=100)
+            graph.add_tasks(op)
+        with pytest.raises(exceptions.ExecutorException):
+            self._execute(
+                workflow_func=mock_workflow,
+                workflow_context=workflow_context,
+                executor=executor)
+        assert workflow_context.states == ['start', 'failure']
+        assert isinstance(workflow_context.exception, 
exceptions.ExecutorException)
+        invocations = global_test_holder.get('invocations', [])
+        assert len(invocations) == 1
+        assert global_test_holder.get('sent_task_signal_calls') == 1
+
+
 @operation
 def mock_success_task(**_):
     pass
@@ -408,7 +480,27 @@ def mock_conditional_failure_task(failure_count, **_):
         invocations.append(time.time())
 
 
+@operation
 def mock_sleep_task(seconds, **_):
+    _add_invocation_timestamp()
+    time.sleep(seconds)
+
+
+@operation
+def mock_task_retry(ctx, message, retry_interval=None, **_):
+    _add_invocation_timestamp()
+    retry_kwargs = {}
+    if retry_interval is not None:
+        retry_kwargs['retry_interval'] = retry_interval
+    ctx.task.retry(message, **retry_kwargs)
+
+
+@operation
+def mock_task_abort(ctx, message, **_):
+    _add_invocation_timestamp()
+    ctx.task.abort(message)
+
+
+def _add_invocation_timestamp():
     invocations = global_test_holder.setdefault('invocations', [])
     invocations.append(time.time())
-    time.sleep(seconds)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8a00b5fc/tests/orchestrator/workflows/executor/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_executor.py 
b/tests/orchestrator/workflows/executor/test_executor.py
index cd00cd5..2486a1e 100644
--- a/tests/orchestrator/workflows/executor/test_executor.py
+++ b/tests/orchestrator/workflows/executor/test_executor.py
@@ -107,6 +107,7 @@ class MockTask(object):
         self.retry_count = 0
         self.max_attempts = 1
         self.plugin_fk = None
+        self.ignore_failure = False
 
         for state in model.Task.STATES:
             setattr(self, state.upper(), state)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8a00b5fc/tests/orchestrator/workflows/executor/test_process_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py 
b/tests/orchestrator/workflows/executor/test_process_executor.py
index e321388..687e245 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor.py
@@ -130,6 +130,7 @@ class MockTask(object):
         self.max_attempts = 1
         self.plugin_fk = plugin.id
         self.plugin = plugin
+        self.ignore_failure = False
 
         for state in aria_model.Task.STATES:
             setattr(self, state.upper(), state)

Reply via email to