Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-10-task-retries 0b3c85d8a -> feacd9f66 (forced update)


ARIA-10 Implement task retries mechanism


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/feacd9f6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/feacd9f6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/feacd9f6

Branch: refs/heads/ARIA-10-task-retries
Commit: feacd9f664cb068b7ebeebad3dda51a941889de3
Parents: 8947f72
Author: Dan Kilman <[email protected]>
Authored: Tue Nov 8 11:03:08 2016 +0200
Committer: Dan Kilman <[email protected]>
Committed: Tue Nov 8 16:34:14 2016 +0200

----------------------------------------------------------------------
 .gitignore                                |   2 +-
 aria/context/workflow.py                  |   5 +-
 aria/events/builtin_event_handler.py      |  14 +-
 aria/storage/models.py                    |   6 +-
 aria/storage/structures.py                |  10 +-
 aria/tools/application.py                 |   6 +-
 aria/workflows/api/task.py                |   8 +-
 aria/workflows/core/engine.py             |   6 +-
 aria/workflows/core/task.py               |  52 ++--
 aria/workflows/core/translation.py        |  21 +-
 tests/mock/context.py                     |   4 +-
 tests/mock/models.py                      |   6 +-
 tests/requirements.txt                    |   2 +-
 tests/storage/test_models.py              |  26 +-
 tests/workflows/api/__init__.py           |   1 -
 tests/workflows/api/test_task.py          |  20 +-
 tests/workflows/core/test_engine.py       | 322 +++++++++++++++++++++++++
 tests/workflows/core/test_executor.py     | 136 -----------
 tests/workflows/executor/__init__.py      |  14 ++
 tests/workflows/executor/test_executor.py | 138 +++++++++++
 tests/workflows/test_engine.py            | 187 --------------
 21 files changed, 596 insertions(+), 390 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/feacd9f6/.gitignore
----------------------------------------------------------------------
diff --git a/.gitignore b/.gitignore
index 29c4e9c..482383a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -29,7 +29,7 @@ pip-delete-this-directory.txt
 # Unit test / coverage reports
 htmlcov/
 .tox/
-.coverage
+.coverage*
 .cache
 nosetests.xml
 coverage.xml

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/feacd9f6/aria/context/workflow.py
----------------------------------------------------------------------
diff --git a/aria/context/workflow.py b/aria/context/workflow.py
index 8183d42..44db38d 100644
--- a/aria/context/workflow.py
+++ b/aria/context/workflow.py
@@ -47,6 +47,8 @@ class WorkflowContext(logger.LoggerMixin):
             workflow_id,
             execution_id=None,
             parameters=None,
+            task_max_retries=0,
+            task_retry_interval=0,
             **kwargs):
         super(WorkflowContext, self).__init__(**kwargs)
         self.name = name
@@ -57,6 +59,8 @@ class WorkflowContext(logger.LoggerMixin):
         self.workflow_id = workflow_id
         self.execution_id = execution_id or str(uuid4())
         self.parameters = parameters or {}
+        self.task_max_retries = task_max_retries
+        self.task_retry_interval = task_retry_interval
 
     def __repr__(self):
         return (
@@ -187,4 +191,3 @@ class _CurrentContext(threading.local):
             self._set(prev_workflow_context)
 
 current = _CurrentContext()
-

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/feacd9f6/aria/events/builtin_event_handler.py
----------------------------------------------------------------------
diff --git a/aria/events/builtin_event_handler.py 
b/aria/events/builtin_event_handler.py
index 2abdd9f..280785e 100644
--- a/aria/events/builtin_event_handler.py
+++ b/aria/events/builtin_event_handler.py
@@ -21,7 +21,10 @@ Implementation of storage handlers for workflow and 
operation events.
 """
 
 
-from datetime import datetime
+from datetime import (
+    datetime,
+    timedelta,
+)
 
 from . import (
     start_workflow_signal,
@@ -50,8 +53,13 @@ def _task_started(task, *args, **kwargs):
 @on_failure_task_signal.connect
 def _task_failed(task, *args, **kwargs):
     with task.update():
-        task.ended_at = datetime.utcnow()
-        task.status = task.FAILED
+        if task.retry_count < task.max_retries:
+            task.status = task.RETRYING
+            task.retry_count += 1
+            task.eta = datetime.utcnow() + 
timedelta(seconds=task.retry_interval)
+        else:
+            task.ended_at = datetime.utcnow()
+            task.status = task.FAILED
 
 
 @on_success_task_signal.connect

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/feacd9f6/aria/storage/models.py
----------------------------------------------------------------------
diff --git a/aria/storage/models.py b/aria/storage/models.py
index 9aa7cf0..e74d733 100644
--- a/aria/storage/models.py
+++ b/aria/storage/models.py
@@ -376,27 +376,31 @@ class Task(Model):
     A Model which represents an task
     """
     PENDING = 'pending'
+    RETRYING = 'retrying'
     SENT = 'sent'
     STARTED = 'started'
     SUCCESS = 'success'
     FAILED = 'failed'
     STATES = (
         PENDING,
+        RETRYING,
         SENT,
         STARTED,
         SUCCESS,
         FAILED,
     )
+    WAIT_STATES = [PENDING, RETRYING]
     END_STATES = [SUCCESS, FAILED]
 
     id = Field(type=basestring, default=uuid_generator)
     status = Field(type=basestring, choices=STATES, default=PENDING)
     execution_id = Field(type=basestring)
-    eta = Field(type=datetime, default=datetime.now)
+    eta = Field(type=datetime, default=datetime.utcnow)
     started_at = Field(type=datetime, default=None)
     ended_at = Field(type=datetime, default=None)
     max_retries = Field(type=int, default=1)
     retry_count = Field(type=int, default=0)
+    retry_interval = Field(type=(int, float), default=0)
 
     # Operation specific fields
     name = Field(type=basestring)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/feacd9f6/aria/storage/structures.py
----------------------------------------------------------------------
diff --git a/aria/storage/structures.py b/aria/storage/structures.py
index ea4cf3a..c692d36 100644
--- a/aria/storage/structures.py
+++ b/aria/storage/structures.py
@@ -195,17 +195,17 @@ class Model(object):
         :param fields: each item is validated and transformed into instance 
attributes.
         """
         self._assert_model_have_id_field(**fields)
-        missing_fileds, unexpected_fileds = self._setup_fields(fields)
+        missing_fields, unexpected_fields = self._setup_fields(fields)
 
-        if missing_fileds:
+        if missing_fields:
             raise StorageError(
                 'Model {name} got missing keyword arguments: {fields}'.format(
-                    name=self.__class__.__name__, fields=missing_fileds))
+                    name=self.__class__.__name__, fields=missing_fields))
 
-        if unexpected_fileds:
+        if unexpected_fields:
             raise StorageError(
                 'Model {name} got unexpected keyword arguments: 
{fields}'.format(
-                    name=self.__class__.__name__, fields=unexpected_fileds))
+                    name=self.__class__.__name__, fields=unexpected_fields))
 
     def __repr__(self):
         return '{name}(fields={0})'.format(sorted(self.fields), 
name=self.__class__.__name__)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/feacd9f6/aria/tools/application.py
----------------------------------------------------------------------
diff --git a/aria/tools/application.py b/aria/tools/application.py
index ddc1317..360ba33 100644
--- a/aria/tools/application.py
+++ b/aria/tools/application.py
@@ -108,7 +108,7 @@ class StorageManager(LoggerMixin):
         self.logger.debug('created blueprint resource storage entry')
 
         self.logger.debug('creating blueprint model storage entry')
-        now = datetime.now()
+        now = datetime.utcnow()
         blueprint = self.model_storage.blueprint.model_cls(
             plan=self.blueprint_plan,
             id=self.blueprint_id,
@@ -175,7 +175,7 @@ class StorageManager(LoggerMixin):
         self.logger.debug('created deployment resource storage entry')
 
         self.logger.debug('creating deployment model storage entry')
-        now = datetime.now()
+        now = datetime.utcnow()
         deployment = self.model_storage.deployment.model_cls(
             id=self.deployment_id,
             blueprint_id=self.blueprint_id,
@@ -241,7 +241,7 @@ class StorageManager(LoggerMixin):
         self.logger.debug('creating plugin model storage entry')
         plugin = _load_plugin_from_archive(source)
         build_props = plugin.get('build_server_os_properties')
-        now = datetime.now()
+        now = datetime.utcnow()
 
         plugin = self.model_storage.plugin.model_cls(
             id=plugin_id,

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/feacd9f6/aria/workflows/api/task.py
----------------------------------------------------------------------
diff --git a/aria/workflows/api/task.py b/aria/workflows/api/task.py
index 1070d99..ca62cf7 100644
--- a/aria/workflows/api/task.py
+++ b/aria/workflows/api/task.py
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 """
-Provides the tasks to be enterd into the task graph
+Provides the tasks to be entered into the task graph
 """
 from uuid import uuid4
 
@@ -58,6 +58,8 @@ class OperationTask(BaseTask):
                  name,
                  operation_details,
                  node_instance,
+                 max_retries=None,
+                 retry_interval=None,
                  inputs=None):
         """
         Creates an operation task using the name, details, node instance and 
any additional kwargs.
@@ -71,6 +73,10 @@ class OperationTask(BaseTask):
         self.operation_details = operation_details
         self.node_instance = node_instance
         self.inputs = inputs or {}
+        self.max_retries = (self.workflow_context.task_max_retries
+                            if max_retries is None else max_retries)
+        self.retry_interval = (self.workflow_context.task_retry_interval
+                               if retry_interval is None else retry_interval)
 
 
 class WorkflowTask(BaseTask):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/feacd9f6/aria/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/workflows/core/engine.py b/aria/workflows/core/engine.py
index a288757..65e17c3 100644
--- a/aria/workflows/core/engine.py
+++ b/aria/workflows/core/engine.py
@@ -63,9 +63,9 @@ class Engine(logger.LoggerMixin):
             raise
 
     def _executable_tasks(self):
-        now = datetime.now()
+        now = datetime.utcnow()
         return (task for task in self._tasks_iter()
-                if task.status == models.Task.PENDING and
+                if task.status in models.Task.WAIT_STATES and
                 task.eta <= now and
                 not self._task_has_dependencies(task))
 
@@ -82,7 +82,7 @@ class Engine(logger.LoggerMixin):
         return (data['task'] for _, data in 
self._execution_graph.nodes_iter(data=True))
 
     def _handle_executable_task(self, task):
-        if isinstance(task, engine_task.BaseWorkflowTask):
+        if isinstance(task, engine_task.StubTask):
             task.status = models.Task.SUCCESS
         else:
             events.sent_task_signal.send(task)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/feacd9f6/aria/workflows/core/task.py
----------------------------------------------------------------------
diff --git a/aria/workflows/core/task.py b/aria/workflows/core/task.py
index fc72b59..1cded79 100644
--- a/aria/workflows/core/task.py
+++ b/aria/workflows/core/task.py
@@ -41,39 +41,39 @@ class BaseTask(logger.LoggerMixin):
         return self._id
 
 
-class BaseWorkflowTask(BaseTask):
+class StubTask(BaseTask):
     """
-    Base class for all workflow wrapping tasks
+    Base stub task for all tasks that don't actually run anything
     """
 
     def __init__(self, *args, **kwargs):
-        super(BaseWorkflowTask, self).__init__(*args, **kwargs)
+        super(StubTask, self).__init__(*args, **kwargs)
         self.status = models.Task.PENDING
-        self.eta = datetime.now()
+        self.eta = datetime.utcnow()
 
 
-class StartWorkflowTask(BaseWorkflowTask):
+class StartWorkflowTask(StubTask):
     """
     Tasks marking a workflow start
     """
     pass
 
 
-class EndWorkflowTask(BaseWorkflowTask):
+class EndWorkflowTask(StubTask):
     """
     Tasks marking a workflow end
     """
     pass
 
 
-class StartSubWorkflowTask(BaseWorkflowTask):
+class StartSubWorkflowTask(StubTask):
     """
     Tasks marking a subworkflow start
     """
     pass
 
 
-class EndSubWorkflowTask(BaseWorkflowTask):
+class EndSubWorkflowTask(StubTask):
     """
     Tasks marking a subworkflow end
     """
@@ -96,7 +96,8 @@ class OperationTask(BaseTask, logger.LoggerMixin):
             inputs=api_task.inputs,
             status=task_model.PENDING,
             execution_id=self.workflow_context.execution_id,
-            max_retries=self.workflow_context.parameters.get('max_retries', 1),
+            max_retries=api_task.max_retries,
+            retry_interval=api_task.retry_interval,
         )
         self.workflow_context.model.task.store(task)
         self._task_id = task.id
@@ -147,9 +148,7 @@ class OperationTask(BaseTask, logger.LoggerMixin):
 
     @status.setter
     def status(self, value):
-        if self._update_fields is None:
-            raise exceptions.TaskException("Task is not in update mode")
-        self._update_fields['status'] = value
+        self._update_property('status', value)
 
     @property
     def started_at(self):
@@ -161,9 +160,7 @@ class OperationTask(BaseTask, logger.LoggerMixin):
 
     @started_at.setter
     def started_at(self, value):
-        if self._update_fields is None:
-            raise exceptions.TaskException("Task is not in update mode")
-        self._update_fields['started_at'] = value
+        self._update_property('started_at', value)
 
     @property
     def ended_at(self):
@@ -175,9 +172,7 @@ class OperationTask(BaseTask, logger.LoggerMixin):
 
     @ended_at.setter
     def ended_at(self, value):
-        if self._update_fields is None:
-            raise exceptions.TaskException("Task is not in update mode")
-        self._update_fields['ended_at'] = value
+        self._update_property('ended_at', value)
 
     @property
     def retry_count(self):
@@ -189,12 +184,27 @@ class OperationTask(BaseTask, logger.LoggerMixin):
 
     @retry_count.setter
     def retry_count(self, value):
-        if self._update_fields is None:
-            raise exceptions.TaskException("Task is not in update mode")
-        self._update_fields['retry_count'] = value
+        self._update_property('retry_count', value)
+
+    @property
+    def eta(self):
+        """
+        Returns the minimum datetime in which the task can be executed
+        :return: eta
+        """
+        return self.context.eta
+
+    @eta.setter
+    def eta(self, value):
+        self._update_property('eta', value)
 
     def __getattr__(self, attr):
         try:
             return getattr(self.context, attr)
         except AttributeError:
             return super(OperationTask, self).__getattribute__(attr)
+
+    def _update_property(self, key, value):
+        if self._update_fields is None:
+            raise exceptions.TaskException("Task is not in update mode")
+        self._update_fields[key] = value

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/feacd9f6/aria/workflows/core/translation.py
----------------------------------------------------------------------
diff --git a/aria/workflows/core/translation.py 
b/aria/workflows/core/translation.py
index cd9a0e6..b6cbdad 100644
--- a/aria/workflows/core/translation.py
+++ b/aria/workflows/core/translation.py
@@ -17,8 +17,8 @@
 Translation of user graph's API to the execution graph
 """
 
-from . import task as core_task
 from .. import api
+from . import task as core_task
 
 
 def build_execution_graph(
@@ -47,12 +47,12 @@ def build_execution_graph(
             dependencies,
             default=[start_task])
 
-        if _is_operation(api_task):
+        if isinstance(api_task, api.task.OperationTask):
             # Add the task an the dependencies
             operation_task = core_task.OperationTask(api_task)
             _add_task_and_dependencies(execution_graph, operation_task, 
operation_dependencies)
-        else:
-            # Built the graph recursively while adding start and end markers
+        elif isinstance(api_task, api.task.WorkflowTask):
+            # Build the graph recursively while adding start and end markers
             build_execution_graph(
                 task_graph=api_task,
                 execution_graph=execution_graph,
@@ -60,6 +60,11 @@ def build_execution_graph(
                 end_cls=core_task.EndSubWorkflowTask,
                 depends_on=operation_dependencies
             )
+        elif isinstance(api_task, api.task.StubTask):
+            stub_task = core_task.StubTask(id=api_task.id)
+            _add_task_and_dependencies(execution_graph, stub_task, 
operation_dependencies)
+        else:
+            raise RuntimeError('Undefined state')
 
     # Insert end marker
     workflow_dependencies = _get_tasks_from_dependencies(
@@ -80,15 +85,13 @@ def _get_tasks_from_dependencies(execution_graph, 
dependencies, default=()):
     """
     Returns task list from dependencies.
     """
-    return [execution_graph.node[dependency.id if _is_operation(dependency)
+    return [execution_graph.node[dependency.id
+                                 if isinstance(dependency, 
(api.task.OperationTask,
+                                                            api.task.StubTask))
                                  else _end_graph_suffix(dependency.id)]['task']
             for dependency in dependencies] or default
 
 
-def _is_operation(task):
-    return isinstance(task, api.task.OperationTask)
-
-
 def _start_graph_suffix(id):
     return '{0}-Start'.format(id)
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/feacd9f6/tests/mock/context.py
----------------------------------------------------------------------
diff --git a/tests/mock/context.py b/tests/mock/context.py
index a89612e..13020f3 100644
--- a/tests/mock/context.py
+++ b/tests/mock/context.py
@@ -28,5 +28,7 @@ def simple():
         resource_storage=None,
         deployment_id=models.DEPLOYMENT_ID,
         workflow_id=models.WORKFLOW_ID,
-        execution_id=models.EXECUTION_ID
+        execution_id=models.EXECUTION_ID,
+        task_max_retries=models.TASK_MAX_RETRIES,
+        task_retry_interval=models.TASK_RETRY_INTERVAL
     )

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/feacd9f6/tests/mock/models.py
----------------------------------------------------------------------
diff --git a/tests/mock/models.py b/tests/mock/models.py
index 633adbb..5547321 100644
--- a/tests/mock/models.py
+++ b/tests/mock/models.py
@@ -23,6 +23,8 @@ DEPLOYMENT_ID = 'test_deployment_id'
 BLUEPRINT_ID = 'test_blueprint_id'
 WORKFLOW_ID = 'test_workflow_id'
 EXECUTION_ID = 'test_execution_id'
+TASK_RETRY_INTERVAL = 1
+TASK_MAX_RETRIES = 1
 
 
 def get_dependency_node():
@@ -115,13 +117,13 @@ def get_execution():
         deployment_id=DEPLOYMENT_ID,
         workflow_id=WORKFLOW_ID,
         blueprint_id=BLUEPRINT_ID,
-        started_at=datetime.now(),
+        started_at=datetime.utcnow(),
         parameters=None
     )
 
 
 def get_deployment():
-    now = datetime.now()
+    now = datetime.utcnow()
     return models.Deployment(
         id=DEPLOYMENT_ID,
         description=None,

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/feacd9f6/tests/requirements.txt
----------------------------------------------------------------------
diff --git a/tests/requirements.txt b/tests/requirements.txt
index 92b4e78..cda295a 100644
--- a/tests/requirements.txt
+++ b/tests/requirements.txt
@@ -12,7 +12,7 @@
 
 testtools
 mock==1.0.1
-pylint==1.5.5
+pylint==1.6.4
 pytest==3.0.2
 pytest-cov==2.3.1
 pytest-mock==1.2

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/feacd9f6/tests/storage/test_models.py
----------------------------------------------------------------------
diff --git a/tests/storage/test_models.py b/tests/storage/test_models.py
index fa7333b..19262bb 100644
--- a/tests/storage/test_models.py
+++ b/tests/storage/test_models.py
@@ -71,8 +71,8 @@ def test_blueprint_model():
         plan={},
         id='id',
         description='description',
-        created_at=datetime.now(),
-        updated_at=datetime.now(),
+        created_at=datetime.utcnow(),
+        updated_at=datetime.utcnow(),
         main_file_name='/path',
     )
     with pytest.raises(TypeError):
@@ -80,8 +80,8 @@ def test_blueprint_model():
             plan=None,
             id='id',
             description='description',
-            created_at=datetime.now(),
-            updated_at=datetime.now(),
+            created_at=datetime.utcnow(),
+            updated_at=datetime.utcnow(),
             main_file_name='/path',
         )
     with pytest.raises(TypeError):
@@ -89,8 +89,8 @@ def test_blueprint_model():
             plan={},
             id=999,
             description='description',
-            created_at=datetime.now(),
-            updated_at=datetime.now(),
+            created_at=datetime.utcnow(),
+            updated_at=datetime.utcnow(),
             main_file_name='/path',
         )
     with pytest.raises(TypeError):
@@ -98,8 +98,8 @@ def test_blueprint_model():
             plan={},
             id='id',
             description=999,
-            created_at=datetime.now(),
-            updated_at=datetime.now(),
+            created_at=datetime.utcnow(),
+            updated_at=datetime.utcnow(),
             main_file_name='/path',
         )
     with pytest.raises(TypeError):
@@ -108,7 +108,7 @@ def test_blueprint_model():
             id='id',
             description='description',
             created_at='error',
-            updated_at=datetime.now(),
+            updated_at=datetime.utcnow(),
             main_file_name='/path',
         )
     with pytest.raises(TypeError):
@@ -116,7 +116,7 @@ def test_blueprint_model():
             plan={},
             id='id',
             description='description',
-            created_at=datetime.now(),
+            created_at=datetime.utcnow(),
             updated_at=None,
             main_file_name='/path',
         )
@@ -125,15 +125,15 @@ def test_blueprint_model():
             plan={},
             id='id',
             description='description',
-            created_at=datetime.now(),
+            created_at=datetime.utcnow(),
             updated_at=None,
             main_file_name=88,
         )
     Blueprint(
         plan={},
         description='description',
-        created_at=datetime.now(),
-        updated_at=datetime.now(),
+        created_at=datetime.utcnow(),
+        updated_at=datetime.utcnow(),
         main_file_name='/path',
     )
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/feacd9f6/tests/workflows/api/__init__.py
----------------------------------------------------------------------
diff --git a/tests/workflows/api/__init__.py b/tests/workflows/api/__init__.py
index 09697dc..ae1e83e 100644
--- a/tests/workflows/api/__init__.py
+++ b/tests/workflows/api/__init__.py
@@ -12,4 +12,3 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/feacd9f6/tests/workflows/api/test_task.py
----------------------------------------------------------------------
diff --git a/tests/workflows/api/test_task.py b/tests/workflows/api/test_task.py
index 7119529..7ecd28c 100644
--- a/tests/workflows/api/test_task.py
+++ b/tests/workflows/api/test_task.py
@@ -67,17 +67,35 @@ class TestOperationTask(object):
         op_details = {'operation_details': True}
         node_instance = mock.models.get_dependency_node_instance()
         inputs = {'inputs': True}
+        max_retries = 10
+        retry_interval = 10
 
         with context.workflow.current.push(workflow_context):
             model_task = api.task.OperationTask(name=name,
                                                 operation_details=op_details,
                                                 node_instance=node_instance,
-                                                inputs=inputs)
+                                                inputs=inputs,
+                                                max_retries=max_retries,
+                                                retry_interval=retry_interval)
 
         assert model_task.name == name
         assert model_task.operation_details == op_details
         assert model_task.node_instance == node_instance
         assert model_task.inputs == inputs
+        assert model_task.retry_interval == retry_interval
+        assert model_task.max_retries == max_retries
+
+    def test_operation_task_default_values(self):
+        workflow_context = mock.context.simple()
+        with context.workflow.current.push(workflow_context):
+            model_task = api.task.OperationTask(
+                name='stub',
+                operation_details={},
+                node_instance=mock.models.get_dependency_node_instance())
+
+        assert model_task.inputs == {}
+        assert model_task.retry_interval == 
workflow_context.task_retry_interval
+        assert model_task.max_retries == workflow_context.task_max_retries
 
 
 class TestWorkflowTask(object):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/feacd9f6/tests/workflows/core/test_engine.py
----------------------------------------------------------------------
diff --git a/tests/workflows/core/test_engine.py 
b/tests/workflows/core/test_engine.py
new file mode 100644
index 0000000..39c9cb2
--- /dev/null
+++ b/tests/workflows/core/test_engine.py
@@ -0,0 +1,322 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import time
+from datetime import datetime
+
+import pytest
+
+import aria
+from aria import events
+from aria import workflow
+from aria import context
+from aria.storage import models
+from aria.workflows import exceptions
+from aria.workflows.executor import thread
+from aria.workflows.core import engine
+from aria.workflows import api
+
+from tests import mock
+
+import tests.storage
+
+logging.basicConfig()
+global_test_holder = {}
+
+
+class BaseTest(object):
+
+    @staticmethod
+    def _execute(workflow_func, workflow_context, executor):
+        graph = workflow_func(ctx=workflow_context)
+        eng = engine.Engine(executor=executor, 
workflow_context=workflow_context, tasks_graph=graph)
+        eng.execute()
+
+    @staticmethod
+    def _op(func, ctx, inputs=None, max_retries=None, retry_interval=None):
+        return api.task.OperationTask(
+            name='task',
+            operation_details={'operation': 
'tests.workflows.core.test_engine.{name}'.format(
+                name=func.__name__)},
+            
node_instance=ctx.model.node_instance.get('dependency_node_instance'),
+            inputs=inputs,
+            max_retries=max_retries,
+            retry_interval=retry_interval
+        )
+
+    @pytest.fixture(scope='function', autouse=True)
+    def globals_cleanup(self):
+        try:
+            yield
+        finally:
+            global_test_holder.clear()
+
+    @pytest.fixture(scope='function', autouse=True)
+    def signals_registration(self, ):
+        def sent_task_handler(*args, **kwargs):
+            calls = global_test_holder.setdefault('sent_task_signal_calls', 0)
+            global_test_holder['sent_task_signal_calls'] = calls + 1
+
+        def start_workflow_handler(workflow_context, *args, **kwargs):
+            workflow_context.states.append('start')
+
+        def success_workflow_handler(workflow_context, *args, **kwargs):
+            workflow_context.states.append('success')
+
+        def failure_workflow_handler(workflow_context, exception, *args, 
**kwargs):
+            workflow_context.states.append('failure')
+            workflow_context.exception = exception
+
+        events.start_workflow_signal.connect(start_workflow_handler)
+        events.on_success_workflow_signal.connect(success_workflow_handler)
+        events.on_failure_workflow_signal.connect(failure_workflow_handler)
+        events.sent_task_signal.connect(sent_task_handler)
+        try:
+            yield
+        finally:
+            events.start_workflow_signal.disconnect(start_workflow_handler)
+            
events.on_success_workflow_signal.disconnect(success_workflow_handler)
+            
events.on_failure_workflow_signal.disconnect(failure_workflow_handler)
+            events.sent_task_signal.disconnect(sent_task_handler)
+
+    @pytest.fixture(scope='function')
+    def executor(self):
+        result = thread.ThreadExecutor()
+        try:
+            yield result
+        finally:
+            result.close()
+
+    @pytest.fixture(scope='function')
+    def workflow_context(self):
+        model_storage = 
aria.application_model_storage(tests.storage.InMemoryModelDriver())
+        model_storage.setup()
+        deployment = models.Deployment(
+            id='d1',
+            blueprint_id='b1',
+            description=None,
+            created_at=datetime.utcnow(),
+            updated_at=datetime.utcnow(),
+            workflows={})
+        model_storage.deployment.store(deployment)
+        node = mock.models.get_dependency_node()
+        node_instance = mock.models.get_dependency_node_instance(node)
+        model_storage.node.store(node)
+        model_storage.node_instance.store(node_instance)
+        result = context.workflow.WorkflowContext(
+            name='test',
+            model_storage=model_storage,
+            resource_storage=None,
+            deployment_id=deployment.id,
+            workflow_id='name')
+        result.states = []
+        result.exception = None
+        return result
+
+
+class TestEngine(BaseTest):
+
+    def test_empty_graph_execution(self, workflow_context, executor):
+        @workflow
+        def mock_workflow(**_):
+            pass
+        self._execute(workflow_func=mock_workflow,
+                      workflow_context=workflow_context,
+                      executor=executor)
+        assert workflow_context.states == ['start', 'success']
+        assert workflow_context.exception is None
+        assert 'sent_task_signal_calls' not in global_test_holder
+
+    def test_single_task_successful_execution(self, workflow_context, 
executor):
+        @workflow
+        def mock_workflow(ctx, graph):
+            graph.add_tasks(self._op(mock_success_task, ctx))
+        self._execute(
+            workflow_func=mock_workflow,
+            workflow_context=workflow_context,
+            executor=executor)
+        assert workflow_context.states == ['start', 'success']
+        assert workflow_context.exception is None
+        assert global_test_holder.get('sent_task_signal_calls') == 1
+
+    def test_single_task_failed_execution(self, workflow_context, executor):
+        @workflow
+        def mock_workflow(ctx, graph):
+            graph.add_tasks(self._op(mock_failed_task, ctx))
+        with pytest.raises(exceptions.ExecutorException):
+            self._execute(
+                workflow_func=mock_workflow,
+                workflow_context=workflow_context,
+                executor=executor)
+        assert workflow_context.states == ['start', 'failure']
+        assert isinstance(workflow_context.exception, 
exceptions.ExecutorException)
+        assert global_test_holder.get('sent_task_signal_calls') == 1
+
+    def test_two_tasks_execution_order(self, workflow_context, executor):
+        @workflow
+        def mock_workflow(ctx, graph):
+            op1 = self._op(mock_ordered_task, ctx, inputs={'counter': 1})
+            op2 = self._op(mock_ordered_task, ctx, inputs={'counter': 2})
+            graph.sequence(op1, op2)
+        self._execute(
+            workflow_func=mock_workflow,
+            workflow_context=workflow_context,
+            executor=executor)
+        assert workflow_context.states == ['start', 'success']
+        assert workflow_context.exception is None
+        assert global_test_holder.get('invocations') == [1, 2]
+        assert global_test_holder.get('sent_task_signal_calls') == 2
+
+    def test_stub_and_subworkflow_execution(self, workflow_context, executor):
+        @workflow
+        def sub_workflow(ctx, graph):
+            op1 = self._op(mock_ordered_task, ctx, inputs={'counter': 1})
+            op2 = api.task.StubTask()
+            op3 = self._op(mock_ordered_task, ctx, inputs={'counter': 2})
+            graph.sequence(op1, op2, op3)
+
+        @workflow
+        def mock_workflow(ctx, graph):
+            graph.add_tasks(api.task.WorkflowTask(sub_workflow, ctx=ctx))
+
+        self._execute(workflow_func=mock_workflow,
+                      workflow_context=workflow_context,
+                      executor=executor)
+
+        assert workflow_context.states == ['start', 'success']
+        assert workflow_context.exception is None
+        assert global_test_holder.get('invocations') == [1, 2]
+        assert global_test_holder.get('sent_task_signal_calls') == 2
+
+
+class TestRetries(BaseTest):
+
+    def test_one_max_retries_and_success_on_retry(self, workflow_context, 
executor):
+        @workflow
+        def mock_workflow(ctx, graph):
+            op = self._op(mock_conditional_failure_task, ctx,
+                          inputs={'fail_on_invocations_less_than': 1},
+                          max_retries=1)
+            graph.add_tasks(op)
+        self._execute(
+            workflow_func=mock_workflow,
+            workflow_context=workflow_context,
+            executor=executor)
+        assert workflow_context.states == ['start', 'success']
+        assert workflow_context.exception is None
+        assert len(global_test_holder.get('invocations', [])) == 2
+        assert global_test_holder.get('sent_task_signal_calls') == 2
+
+    def test_one_max_retries_and_failure_on_retry(self, workflow_context, 
executor):
+        @workflow
+        def mock_workflow(ctx, graph):
+            op = self._op(mock_conditional_failure_task, ctx,
+                          inputs={'fail_on_invocations_less_than': 2},
+                          max_retries=1)
+            graph.add_tasks(op)
+        with pytest.raises(exceptions.ExecutorException):
+            self._execute(
+                workflow_func=mock_workflow,
+                workflow_context=workflow_context,
+                executor=executor)
+        assert workflow_context.states == ['start', 'failure']
+        assert isinstance(workflow_context.exception, 
exceptions.ExecutorException)
+        assert len(global_test_holder.get('invocations', [])) == 2
+        assert global_test_holder.get('sent_task_signal_calls') == 2
+
+    def test_two_max_retries_and_success_on_first_retry(self, 
workflow_context, executor):
+        @workflow
+        def mock_workflow(ctx, graph):
+            op = self._op(mock_conditional_failure_task, ctx,
+                          inputs={'fail_on_invocations_less_than': 1},
+                          max_retries=2)
+            graph.add_tasks(op)
+        self._execute(
+            workflow_func=mock_workflow,
+            workflow_context=workflow_context,
+            executor=executor)
+        assert workflow_context.states == ['start', 'success']
+        assert workflow_context.exception is None
+        assert len(global_test_holder.get('invocations', [])) == 2
+        assert global_test_holder.get('sent_task_signal_calls') == 2
+
+    def test_two_max_retries_and_success_on_second_retry(self, 
workflow_context, executor):
+        @workflow
+        def mock_workflow(ctx, graph):
+            op = self._op(mock_conditional_failure_task, ctx,
+                          inputs={'fail_on_invocations_less_than': 2},
+                          max_retries=2)
+            graph.add_tasks(op)
+        self._execute(
+            workflow_func=mock_workflow,
+            workflow_context=workflow_context,
+            executor=executor)
+        assert workflow_context.states == ['start', 'success']
+        assert workflow_context.exception is None
+        assert len(global_test_holder.get('invocations', [])) == 3
+        assert global_test_holder.get('sent_task_signal_calls') == 3
+
+    def test_retry_interval_float(self, workflow_context, executor):
+        self._test_retry_interval(retry_interval=0.3,
+                                  workflow_context=workflow_context,
+                                  executor=executor)
+
+    def test_retry_interval_int(self, workflow_context, executor):
+        self._test_retry_interval(retry_interval=1,
+                                  workflow_context=workflow_context,
+                                  executor=executor)
+
+    def _test_retry_interval(self, retry_interval, workflow_context, executor):
+        @workflow
+        def mock_workflow(ctx, graph):
+            op = self._op(mock_conditional_failure_task, ctx,
+                          inputs={'fail_on_invocations_less_than': 1},
+                          max_retries=1,
+                          retry_interval=retry_interval)
+            graph.add_tasks(op)
+        self._execute(
+            workflow_func=mock_workflow,
+            workflow_context=workflow_context,
+            executor=executor)
+        assert workflow_context.states == ['start', 'success']
+        assert workflow_context.exception is None
+        invocations = global_test_holder.get('invocations', [])
+        assert len(invocations) == 2
+        invocation1, invocation2 = invocations
+        assert invocation2 - invocation1 >= retry_interval
+        assert global_test_holder.get('sent_task_signal_calls') == 2
+
+
+def mock_success_task():
+    pass
+
+
+def mock_failed_task():
+    raise RuntimeError
+
+
+def mock_ordered_task(counter):
+    invocations = global_test_holder.setdefault('invocations', [])
+    invocations.append(counter)
+
+
+def mock_conditional_failure_task(fail_on_invocations_less_than):
+    invocations = global_test_holder.setdefault('invocations', [])
+    try:
+        if len(invocations) < fail_on_invocations_less_than:
+            raise RuntimeError
+    finally:
+        invocations.append(time.time())

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/feacd9f6/tests/workflows/core/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/workflows/core/test_executor.py 
b/tests/workflows/core/test_executor.py
deleted file mode 100644
index 8ec0303..0000000
--- a/tests/workflows/core/test_executor.py
+++ /dev/null
@@ -1,136 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import logging
-import uuid
-from contextlib import contextmanager
-
-import pytest
-import retrying
-
-from aria import events
-from aria.storage import models
-from aria.workflows.executor import (
-    thread,
-    multiprocess,
-    blocking,
-    # celery
-)
-
-try:
-    import celery as _celery
-    app = _celery.Celery()
-    app.conf.update(CELERY_RESULT_BACKEND='amqp://')
-except ImportError:
-    _celery = None
-    app = None
-
-
-class TestExecutor(object):
-
-    @pytest.mark.parametrize('executor_cls,executor_kwargs', [
-        (thread.ThreadExecutor, {'pool_size': 1}),
-        (thread.ThreadExecutor, {'pool_size': 2}),
-        (multiprocess.MultiprocessExecutor, {'pool_size': 1}),
-        (multiprocess.MultiprocessExecutor, {'pool_size': 2}),
-        (blocking.CurrentThreadBlockingExecutor, {}),
-        # (celery.CeleryExecutor, {'app': app})
-    ])
-    def test_execute(self, executor_cls, executor_kwargs):
-        self.executor = executor_cls(**executor_kwargs)
-        expected_value = 'value'
-        successful_task = MockTask(mock_successful_task)
-        failing_task = MockTask(mock_failing_task)
-        task_with_inputs = MockTask(mock_task_with_input, inputs={'input': 
expected_value})
-
-        for task in [successful_task, failing_task, task_with_inputs]:
-            self.executor.execute(task)
-
-        @retrying.retry(stop_max_delay=10000, wait_fixed=100)
-        def assertion():
-            assert successful_task.states == ['start', 'success']
-            assert failing_task.states == ['start', 'failure']
-            assert task_with_inputs.states == ['start', 'failure']
-            assert isinstance(failing_task.exception, MockException)
-            assert isinstance(task_with_inputs.exception, MockException)
-            assert task_with_inputs.exception.message == expected_value
-        assertion()
-
-    def setup_method(self):
-        events.start_task_signal.connect(start_handler)
-        events.on_success_task_signal.connect(success_handler)
-        events.on_failure_task_signal.connect(failure_handler)
-
-    def teardown_method(self):
-        events.start_task_signal.disconnect(start_handler)
-        events.on_success_task_signal.disconnect(success_handler)
-        events.on_failure_task_signal.disconnect(failure_handler)
-        if hasattr(self, 'executor'):
-            self.executor.close()
-
-
-def mock_successful_task():
-    pass
-
-
-def mock_failing_task():
-    raise MockException
-
-
-def mock_task_with_input(input):
-    raise MockException(input)
-
-if app:
-    mock_successful_task = app.task(mock_successful_task)
-    mock_failing_task = app.task(mock_failing_task)
-    mock_task_with_input = app.task(mock_task_with_input)
-
-
-class MockException(Exception):
-    pass
-
-
-class MockTask(object):
-
-    def __init__(self, func, inputs=None):
-        self.states = []
-        self.exception = None
-        self.id = str(uuid.uuid4())
-        name = func.__name__
-        operation = 
'tests.workflows.core.test_executor.{name}'.format(name=name)
-        self.operation_details = {'operation': operation}
-        self.logger = logging.getLogger()
-        self.name = name
-        self.inputs = inputs or {}
-
-        for state in models.Task.STATES:
-            setattr(self, state.upper(), state)
-
-    @contextmanager
-    def update(self):
-        yield self
-
-
-def start_handler(task, *args, **kwargs):
-    task.states.append('start')
-
-
-def success_handler(task, *args, **kwargs):
-    task.states.append('success')
-
-
-def failure_handler(task, exception, *args, **kwargs):
-    task.states.append('failure')
-    task.exception = exception

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/feacd9f6/tests/workflows/executor/__init__.py
----------------------------------------------------------------------
diff --git a/tests/workflows/executor/__init__.py 
b/tests/workflows/executor/__init__.py
new file mode 100644
index 0000000..ae1e83e
--- /dev/null
+++ b/tests/workflows/executor/__init__.py
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/feacd9f6/tests/workflows/executor/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/workflows/executor/test_executor.py 
b/tests/workflows/executor/test_executor.py
new file mode 100644
index 0000000..bbbfbc7
--- /dev/null
+++ b/tests/workflows/executor/test_executor.py
@@ -0,0 +1,138 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import uuid
+from contextlib import contextmanager
+
+import pytest
+import retrying
+
+from aria import events
+from aria.storage import models
+from aria.workflows.executor import (
+    thread,
+    multiprocess,
+    blocking,
+    # celery
+)
+
+try:
+    import celery as _celery
+    app = _celery.Celery()
+    app.conf.update(CELERY_RESULT_BACKEND='amqp://')
+except ImportError:
+    _celery = None
+    app = None
+
+
+class TestExecutor(object):
+
+    @pytest.mark.parametrize('executor_cls,executor_kwargs', [
+        (thread.ThreadExecutor, {'pool_size': 1}),
+        (thread.ThreadExecutor, {'pool_size': 2}),
+        (multiprocess.MultiprocessExecutor, {'pool_size': 1}),
+        (multiprocess.MultiprocessExecutor, {'pool_size': 2}),
+        (blocking.CurrentThreadBlockingExecutor, {}),
+        # (celery.CeleryExecutor, {'app': app})
+    ])
+    def test_execute(self, executor_cls, executor_kwargs):
+        self.executor = executor_cls(**executor_kwargs)
+        expected_value = 'value'
+        successful_task = MockTask(mock_successful_task)
+        failing_task = MockTask(mock_failing_task)
+        task_with_inputs = MockTask(mock_task_with_input, inputs={'input': 
expected_value})
+
+        for task in [successful_task, failing_task, task_with_inputs]:
+            self.executor.execute(task)
+
+        @retrying.retry(stop_max_delay=10000, wait_fixed=100)
+        def assertion():
+            assert successful_task.states == ['start', 'success']
+            assert failing_task.states == ['start', 'failure']
+            assert task_with_inputs.states == ['start', 'failure']
+            assert isinstance(failing_task.exception, MockException)
+            assert isinstance(task_with_inputs.exception, MockException)
+            assert task_with_inputs.exception.message == expected_value
+        assertion()
+
+    def setup_method(self):
+        events.start_task_signal.connect(start_handler)
+        events.on_success_task_signal.connect(success_handler)
+        events.on_failure_task_signal.connect(failure_handler)
+
+    def teardown_method(self):
+        events.start_task_signal.disconnect(start_handler)
+        events.on_success_task_signal.disconnect(success_handler)
+        events.on_failure_task_signal.disconnect(failure_handler)
+        if hasattr(self, 'executor'):
+            self.executor.close()
+
+
+def mock_successful_task():
+    pass
+
+
+def mock_failing_task():
+    raise MockException
+
+
+def mock_task_with_input(input):
+    raise MockException(input)
+
+if app:
+    mock_successful_task = app.task(mock_successful_task)
+    mock_failing_task = app.task(mock_failing_task)
+    mock_task_with_input = app.task(mock_task_with_input)
+
+
+class MockException(Exception):
+    pass
+
+
+class MockTask(object):
+
+    def __init__(self, func, inputs=None):
+        self.states = []
+        self.exception = None
+        self.id = str(uuid.uuid4())
+        name = func.__name__
+        operation = 
'tests.workflows.executor.test_executor.{name}'.format(name=name)
+        self.operation_details = {'operation': operation}
+        self.logger = logging.getLogger()
+        self.name = name
+        self.inputs = inputs or {}
+        self.retry_count = 0
+        self.max_retries = 0
+
+        for state in models.Task.STATES:
+            setattr(self, state.upper(), state)
+
+    @contextmanager
+    def update(self):
+        yield self
+
+
+def start_handler(task, *args, **kwargs):
+    task.states.append('start')
+
+
+def success_handler(task, *args, **kwargs):
+    task.states.append('success')
+
+
+def failure_handler(task, exception, *args, **kwargs):
+    task.states.append('failure')
+    task.exception = exception

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/feacd9f6/tests/workflows/test_engine.py
----------------------------------------------------------------------
diff --git a/tests/workflows/test_engine.py b/tests/workflows/test_engine.py
deleted file mode 100644
index ea703f5..0000000
--- a/tests/workflows/test_engine.py
+++ /dev/null
@@ -1,187 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from datetime import datetime
-
-import pytest
-
-import aria
-from aria import events
-from aria import workflow
-from aria import context
-from aria.storage import models
-from aria.workflows import exceptions
-from aria.workflows.executor import thread
-from aria.workflows.core import engine
-from aria.workflows import api
-
-from .. import mock
-
-import tests.storage
-
-
-global_test_holder = {}
-
-
-class TestEngine(object):
-
-    def test_empty_graph_execution(self, workflow_context, executor):
-        @workflow
-        def mock_workflow(**_):
-            pass
-        self._execute(workflow_func=mock_workflow,
-                      workflow_context=workflow_context,
-                      executor=executor)
-        assert workflow_context.states == ['start', 'success']
-        assert workflow_context.exception is None
-        assert 'sent_task_signal_calls' not in global_test_holder
-
-    def test_single_task_successful_execution(self, workflow_context, 
executor):
-        @workflow
-        def mock_workflow(ctx, graph):
-            graph.add_tasks(self._op(mock_success_task, ctx))
-        self._execute(
-            workflow_func=mock_workflow,
-            workflow_context=workflow_context,
-            executor=executor)
-        assert workflow_context.states == ['start', 'success']
-        assert workflow_context.exception is None
-        assert global_test_holder.get('sent_task_signal_calls') == 1
-
-    def test_single_task_failed_execution(self, workflow_context, executor):
-        @workflow
-        def mock_workflow(ctx, graph):
-            graph.add_tasks(self._op(mock_failed_task, ctx))
-        with pytest.raises(exceptions.ExecutorException):
-            self._execute(
-                workflow_func=mock_workflow,
-                workflow_context=workflow_context,
-                executor=executor)
-        assert workflow_context.states == ['start', 'failure']
-        assert isinstance(workflow_context.exception, 
exceptions.ExecutorException)
-        assert global_test_holder.get('sent_task_signal_calls') == 1
-
-    def test_two_tasks_execution_order(self, workflow_context, executor):
-        @workflow
-        def mock_workflow(ctx, graph):
-            op1 = self._op(mock_ordered_task, ctx, inputs={'counter': 1})
-            op2 = self._op(mock_ordered_task, ctx, inputs={'counter': 2})
-            graph.sequence(op1, op2)
-        self._execute(
-            workflow_func=mock_workflow,
-            workflow_context=workflow_context,
-            executor=executor)
-        assert workflow_context.states == ['start', 'success']
-        assert workflow_context.exception is None
-        assert global_test_holder.get('invocations') == [1, 2]
-        assert global_test_holder.get('sent_task_signal_calls') == 2
-
-    @staticmethod
-    def _execute(workflow_func, workflow_context, executor):
-        graph = workflow_func(ctx=workflow_context)
-        eng = engine.Engine(executor=executor, 
workflow_context=workflow_context, tasks_graph=graph)
-        eng.execute()
-
-    @staticmethod
-    def _op(func, ctx, inputs=None):
-        return api.task.OperationTask(
-            name='task',
-            operation_details={'operation': 
'tests.workflows.test_engine.{name}'.format(
-                name=func.__name__)},
-            
node_instance=ctx.model.node_instance.get('dependency_node_instance'),
-            inputs=inputs
-        )
-
-    @pytest.fixture(scope='function', autouse=True)
-    def globals_cleanup(self):
-        try:
-            yield
-        finally:
-            global_test_holder.clear()
-
-    @pytest.fixture(scope='function', autouse=True)
-    def signals_registration(self, ):
-        def sent_task_handler(*args, **kwargs):
-            calls = global_test_holder.setdefault('sent_task_signal_calls', 0)
-            global_test_holder['sent_task_signal_calls'] = calls + 1
-
-        def start_workflow_handler(workflow_context, *args, **kwargs):
-            workflow_context.states.append('start')
-
-        def success_workflow_handler(workflow_context, *args, **kwargs):
-            workflow_context.states.append('success')
-
-        def failure_workflow_handler(workflow_context, exception, *args, 
**kwargs):
-            workflow_context.states.append('failure')
-            workflow_context.exception = exception
-
-        events.start_workflow_signal.connect(start_workflow_handler)
-        events.on_success_workflow_signal.connect(success_workflow_handler)
-        events.on_failure_workflow_signal.connect(failure_workflow_handler)
-        events.sent_task_signal.connect(sent_task_handler)
-        try:
-            yield
-        finally:
-            events.start_workflow_signal.disconnect(start_workflow_handler)
-            
events.on_success_workflow_signal.disconnect(success_workflow_handler)
-            
events.on_failure_workflow_signal.disconnect(failure_workflow_handler)
-            events.sent_task_signal.disconnect(sent_task_handler)
-
-    @pytest.fixture(scope='function')
-    def executor(self):
-        result = thread.ThreadExecutor()
-        try:
-            yield result
-        finally:
-            result.close()
-
-    @pytest.fixture(scope='function')
-    def workflow_context(self):
-        model_storage = 
aria.application_model_storage(tests.storage.InMemoryModelDriver())
-        model_storage.setup()
-        deployment = models.Deployment(
-            id='d1',
-            blueprint_id='b1',
-            description=None,
-            created_at=datetime.now(),
-            updated_at=datetime.now(),
-            workflows={})
-        model_storage.deployment.store(deployment)
-        node = mock.models.get_dependency_node()
-        node_instance = mock.models.get_dependency_node_instance(node)
-        model_storage.node.store(node)
-        model_storage.node_instance.store(node_instance)
-        result = context.workflow.WorkflowContext(
-            name='test',
-            model_storage=model_storage,
-            resource_storage=None,
-            deployment_id=deployment.id,
-            workflow_id='name')
-        result.states = []
-        result.exception = None
-        return result
-
-
-def mock_success_task():
-    pass
-
-
-def mock_failed_task():
-    raise RuntimeError
-
-
-def mock_ordered_task(counter):
-    invocations = global_test_holder.setdefault('invocations', [])
-    invocations.append(counter)

Reply via email to