Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-278-Remove-core-tasks f36fe86c1 -> 62fa985c6


wip2


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/62fa985c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/62fa985c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/62fa985c

Branch: refs/heads/ARIA-278-Remove-core-tasks
Commit: 62fa985c6bf1e888710dd109a35cc122d7841ba2
Parents: f36fe86
Author: max-orlov <[email protected]>
Authored: Mon Jun 12 19:47:50 2017 +0300
Committer: max-orlov <[email protected]>
Committed: Mon Jun 12 19:47:50 2017 +0300

----------------------------------------------------------------------
 aria/modeling/orchestration.py                  |  68 ++-
 aria/orchestrator/workflows/core/engine.py      |  13 +-
 .../workflows/core/events_handler.py            |  72 +--
 aria/orchestrator/workflows/core/translation.py |   6 +-
 aria/orchestrator/workflows/events_logging.py   |  25 +-
 aria/orchestrator/workflows/executor/base.py    |  31 +-
 aria/orchestrator/workflows/executor/process.py |  18 +-
 aria/orchestrator/workflows/executor/thread.py  |  19 +-
 tests/orchestrator/context/__init__.py          |   5 +-
 tests/orchestrator/context/test_operation.py    |  14 +-
 .../orchestrator/execution_plugin/test_local.py |   4 +-
 .../orchestrator/workflows/core/_test_engine.py | 519 -------------------
 .../orchestrator/workflows/core/test_engine.py  | 519 +++++++++++++++++++
 .../orchestrator/workflows/core/test_events.py  |   2 +-
 .../orchestrator/workflows/executor/__init__.py |  22 +-
 .../workflows/executor/test_executor.py         |   2 +-
 16 files changed, 665 insertions(+), 674 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62fa985c/aria/modeling/orchestration.py
----------------------------------------------------------------------
diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py
index 36f1421..9ac885d 100644
--- a/aria/modeling/orchestration.py
+++ b/aria/modeling/orchestration.py
@@ -21,6 +21,7 @@ classes:
 """
 
 # pylint: disable=no-self-argument, no-member, abstract-method
+from contextlib import contextmanager
 from datetime import datetime
 
 from sqlalchemy import (
@@ -308,25 +309,8 @@ class TaskBase(mixins.ModelMixin):
     api_id = Column(String)
 
     _executor = Column(PickleType)
-    _executor_kwargs = Column(PickleType, default=None)
-
-    _context = Column(PickleType)
-    _context_kwargs = Column(PickleType, default=None)
-
-    @property
-    def executor(self):
-        return self._executor(**(self._executor_kwargs or {}))
-
-    @property
-    def context(self):
-        return self._context.instantiate_from_dict(task_id=self.id, 
**self._context_kwargs)
-
-    def execute(self):
-        executor = self.executor
-        try:
-            return executor.execute(self)
-        finally:
-            executor.close()
+    _executor_kwargs = Column(modeling_types.Dict)
+    _context_cls = Column(PickleType)
 
     @declared_attr
     def logs(cls):
@@ -438,17 +422,14 @@ class TaskBase(mixins.ModelMixin):
             return self.status in (self.PENDING, self.RETRYING)
 
     @classmethod
-    def from_api_task(cls, api_task, executor, executor_kwargs=None):
-        from aria.modeling import models
+    def from_api_task(cls, api_task, executor, executor_kwargs=None, **kwargs):
         from aria.orchestrator import context
+        instantiation_kwargs = {}
 
-        instantiation_kwargs = {'_executor': executor,
-                                '_executor_kwargs': executor_kwargs}
-
-        if isinstance(api_task.actor, models.Node):
+        if hasattr(api_task.actor, 'outbound_relationships'):
             context_cls = context.operation.NodeOperationContext
             instantiation_kwargs['node'] = api_task.actor
-        elif isinstance(api_task.actor, models.Relationship):
+        elif hasattr(api_task.actor, 'source_node'):
             context_cls = context.operation.RelationshipOperationContext
             instantiation_kwargs['relationship'] = api_task.actor
         else:
@@ -471,21 +452,34 @@ class TaskBase(mixins.ModelMixin):
                 'function': api_task.function,
                 'arguments': api_task.arguments,
                 'api_id': api_task.id,
-
-                '_context': context_cls,
-                '_context_kwargs': {
-                    'name': api_task.name,
-                    'model_storage': 
api_task._workflow_context.model.serialization_dict,
-                    'resource_storage': 
api_task._workflow_context.resource.serialization_dict,
-                    'service_id': api_task._workflow_context._service_id,
-                    'actor_id': api_task.id,
-                    'execution_id': api_task._workflow_context._execution_id,
-                    'workdir': api_task._workflow_context._workdir
-                }
+                '_context_cls': context_cls,
+                '_executor': executor,
+                '_executor_kwargs': executor_kwargs or {}
         })
 
+        instantiation_kwargs.update(**kwargs)
+
         return cls(**instantiation_kwargs)
 
+    def execute(self, ctx):
+        from aria.orchestrator.context import operation
+        context_cls = self._context_cls or operation.BaseOperationContext
+        op_ctx = context_cls(
+            model_storage=ctx.model,
+            resource_storage=ctx.resource,
+            workdir=ctx._workdir,
+            task_id=self.id,
+            actor_id=self.actor.id if self.actor else None,
+            service_id=self.execution.service.id,
+            execution_id=self.execution.id,
+            name=self.name
+        )
+        executor = self._executor(**(self._executor_kwargs or {}))
+        try:
+            return executor.execute(op_ctx)
+        finally:
+            executor.close()
+
 
 class LogBase(mixins.ModelMixin):
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62fa985c/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py 
b/aria/orchestrator/workflows/core/engine.py
index 120d83a..02749b7 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -43,12 +43,12 @@ class Engine(logger.LoggerMixin):
         self._execution_graph = networkx.DiGraph()
         translation.build_execution_graph(task_graph=tasks_graph,
                                           
execution_graph=self._execution_graph,
-                                          executor_kwargs=executor_kwargs,
                                           default_executor=executor,
+                                          executor_kwargs=executor_kwargs,
                                           execution=workflow_context.execution)
 
-        # Update the storage
-        workflow_context.execution = workflow_context.execution
+        # Flush changes
+        workflow_context.model.execution._session.flush()
 
     def execute(self):
         """
@@ -99,7 +99,7 @@ class Engine(logger.LoggerMixin):
         return (task for task in self._tasks_iter() if task.has_ended())
 
     def _task_has_dependencies(self, task):
-        return len(self._execution_graph.pred.get(task.id, {})) > 0
+        return len(self._execution_graph.pred.get(task.api_id, {})) > 0
 
     def _all_tasks_consumed(self):
         return len(self._execution_graph.node) == 0
@@ -112,11 +112,10 @@ class Engine(logger.LoggerMixin):
                     self._workflow_context.model.task.refresh(task)
             yield task
 
-    @staticmethod
-    def _handle_executable_task(task):
+    def _handle_executable_task(self, task):
         if not task.stub_type:
             events.sent_task_signal.send(task)
-        task.execute()
+        task.execute(self._workflow_context)
 
     def _handle_ended_tasks(self, task):
         if task.status == models.Task.FAILED and not task.ignore_failure:

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62fa985c/aria/orchestrator/workflows/core/events_handler.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/events_handler.py 
b/aria/orchestrator/workflows/core/events_handler.py
index c733e79..5d979c4 100644
--- a/aria/orchestrator/workflows/core/events_handler.py
+++ b/aria/orchestrator/workflows/core/events_handler.py
@@ -35,44 +35,43 @@ def _task_sent(task, *args, **kwargs):
 
 
 @events.start_task_signal.connect
-def _task_started(task, *args, **kwargs):
-    task.started_at = datetime.utcnow()
-    task.status = task.STARTED
-    _update_node_state_if_necessary(task, is_transitional=True)
+def _task_started(ctx, *args, **kwargs):
+    ctx.task.started_at = datetime.utcnow()
+    ctx.task.status = ctx.task.STARTED
+    _update_node_state_if_necessary(ctx, is_transitional=True)
 
 
 @events.on_failure_task_signal.connect
-def _task_failed(task, exception, *args, **kwargs):
-    with task._update():
-        should_retry = all([
-            not isinstance(exception, exceptions.TaskAbortException),
-            task.attempts_count < task.max_attempts or task.max_attempts == 
task.INFINITE_RETRIES,
-            # ignore_failure check here means the task will not be retries and 
it will be marked
-            # as failed. The engine will also look at ignore_failure so it 
won't fail the
-            # workflow.
-            not task.ignore_failure
-        ])
-        if should_retry:
-            retry_interval = None
-            if isinstance(exception, exceptions.TaskRetryException):
-                retry_interval = exception.retry_interval
-            if retry_interval is None:
-                retry_interval = task.retry_interval
-            task.status = task.RETRYING
-            task.attempts_count += 1
-            task.due_at = datetime.utcnow() + timedelta(seconds=retry_interval)
-        else:
-            task.ended_at = datetime.utcnow()
-            task.status = task.FAILED
+def _task_failed(ctx, exception, *args, **kwargs):
+    should_retry = all([
+        not isinstance(exception, exceptions.TaskAbortException),
+        ctx.task.attempts_count < ctx.task.max_attempts or
+        ctx.task.max_attempts == ctx.task.INFINITE_RETRIES,
+        # ignore_failure check here means the task will not be retries and it 
will be marked
+        # as failed. The engine will also look at ignore_failure so it won't 
fail the
+        # workflow.
+        not ctx.task.ignore_failure
+    ])
+    if should_retry:
+        retry_interval = None
+        if isinstance(exception, exceptions.TaskRetryException):
+            retry_interval = exception.retry_interval
+        if retry_interval is None:
+            retry_interval = ctx.task.retry_interval
+        ctx.task.status = ctx.task.RETRYING
+        ctx.task.attempts_count += 1
+        ctx.task.due_at = datetime.utcnow() + timedelta(seconds=retry_interval)
+    else:
+        ctx.task.ended_at = datetime.utcnow()
+        ctx.task.status = ctx.task.FAILED
 
 
 @events.on_success_task_signal.connect
-def _task_succeeded(task, *args, **kwargs):
-    with task._update():
-        task.ended_at = datetime.utcnow()
-        task.status = task.SUCCESS
+def _task_succeeded(ctx, *args, **kwargs):
+    ctx.task.ended_at = datetime.utcnow()
+    ctx.task.status = ctx.task.SUCCESS
 
-    _update_node_state_if_necessary(task)
+    _update_node_state_if_necessary(ctx)
 
 
 @events.start_workflow_signal.connect
@@ -131,16 +130,17 @@ def _workflow_cancelling(workflow_context, *args, 
**kwargs):
         workflow_context.execution = execution
 
 
-def _update_node_state_if_necessary(task, is_transitional=False):
+def _update_node_state_if_necessary(ctx, is_transitional=False):
     # TODO: this is not the right way to check! the interface name is arbitrary
     # and also will *never* be the type name
-    node = task.node if task is not None else None
+    node = ctx.task.node if ctx.task is not None else None
     if (node is not None) and \
-        (task.interface_name in ('Standard', 
'tosca.interfaces.node.lifecycle.Standard')):
-        state = node.determine_state(op_name=task.operation_name, 
is_transitional=is_transitional)
+        (ctx.task.interface_name in ('Standard', 
'tosca.interfaces.node.lifecycle.Standard')):
+        state = node.determine_state(op_name=ctx.task.operation_name,
+                                     is_transitional=is_transitional)
         if state:
             node.state = state
-            task.context.model.node.update(node)
+            ctx.model.node.update(node)
 
 
 def _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, 
status):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62fa985c/aria/orchestrator/workflows/core/translation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/translation.py 
b/aria/orchestrator/workflows/core/translation.py
index 50fd65b..93c173e 100644
--- a/aria/orchestrator/workflows/core/translation.py
+++ b/aria/orchestrator/workflows/core/translation.py
@@ -26,8 +26,8 @@ def build_execution_graph(
         task_graph,
         execution_graph,
         default_executor,
+        executor_kwargs,
         execution,
-        executor_kwargs=None,
         start_stub_type=models.Task.START_WORKFLOW,
         end_stub_type=models.Task.END_WORKFLOW,
         depends_on=()):
@@ -63,8 +63,8 @@ def build_execution_graph(
                 task_graph=api_task,
                 execution_graph=execution_graph,
                 default_executor=default_executor,
-                execution=execution,
                 executor_kwargs=executor_kwargs,
+                execution=execution,
                 start_stub_type=models.Task.START_SUBWROFKLOW,
                 end_stub_type=models.Task.END_SUBWORKFLOW,
                 depends_on=operation_dependencies
@@ -73,7 +73,7 @@ def build_execution_graph(
             stub_task = models.Task(api_id=api_task.id,
                                     _executor=base.StubTaskExecutor,
                                     execution=execution,
-                                    stub_type=models.StubTask.STUB)
+                                    stub_type=models.Task.STUB)
             _add_task_and_dependencies(execution_graph, stub_task, 
operation_dependencies)
         else:
             raise RuntimeError('Undefined state')

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62fa985c/aria/orchestrator/workflows/events_logging.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/events_logging.py 
b/aria/orchestrator/workflows/events_logging.py
index 036c1f7..4cee867 100644
--- a/aria/orchestrator/workflows/events_logging.py
+++ b/aria/orchestrator/workflows/events_logging.py
@@ -34,31 +34,32 @@ def _get_task_name(task):
 
 
 @events.start_task_signal.connect
-def _start_task_handler(task, **kwargs):
+def _start_task_handler(ctx, **kwargs):
     # If the task has no function this is an empty task.
-    if task.function:
+    if ctx.task.function:
         suffix = 'started...'
-        logger = task.context.logger.info
+        logger = ctx.logger.info
     else:
         suffix = 'has no implementation'
-        logger = task.context.logger.debug
+        logger = ctx.logger.debug
 
     logger('{name} {task.interface_name}.{task.operation_name} 
{suffix}'.format(
-        name=_get_task_name(task), task=task, suffix=suffix))
+        name=_get_task_name(ctx.task), task=ctx.task, suffix=suffix))
+
 
 @events.on_success_task_signal.connect
-def _success_task_handler(task, **kwargs):
-    if not task.function:
+def _success_task_handler(ctx, **kwargs):
+    if not ctx.task.function:
         return
-    task.context.logger.info('{name} 
{task.interface_name}.{task.operation_name} successful'
-                             .format(name=_get_task_name(task), task=task))
+    ctx.logger.info('{name} {task.interface_name}.{task.operation_name} 
successful'
+                    .format(name=_get_task_name(ctx.task), task=ctx.task))
 
 
 @events.on_failure_task_signal.connect
-def _failure_operation_handler(task, traceback, **kwargs):
-    task.context.logger.error(
+def _failure_operation_handler(ctx, traceback, **kwargs):
+    ctx.logger.error(
         '{name} {task.interface_name}.{task.operation_name} failed'
-        .format(name=_get_task_name(task), task=task), 
extra=dict(traceback=traceback)
+        .format(name=_get_task_name(ctx.task), task=ctx.task), 
extra=dict(traceback=traceback)
     )
 
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62fa985c/aria/orchestrator/workflows/executor/base.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/base.py 
b/aria/orchestrator/workflows/executor/base.py
index 7fece6f..69e4a39 100644
--- a/aria/orchestrator/workflows/executor/base.py
+++ b/aria/orchestrator/workflows/executor/base.py
@@ -28,19 +28,21 @@ class BaseExecutor(logger.LoggerMixin):
     def _execute(self, task):
         raise NotImplementedError
 
-    def execute(self, task):
+    def execute(self, ctx):
         """
         Execute a task
         :param task: task to execute
         """
-        if task.function:
-            self._execute(task)
+        if ctx.task.function:
+            self._execute(ctx)
+            ctx.model.task.update(ctx.task)
         else:
             # In this case the task is missing a function. This task still 
gets to an
             # executor, but since there is nothing to run, we by default 
simply skip the execution
             # itself.
-            self._task_started(task)
-            self._task_succeeded(task)
+            self._task_started(ctx)
+            self._task_succeeded(ctx)
+            ctx.model.task.update(ctx.task)
 
     def close(self):
         """
@@ -49,18 +51,21 @@ class BaseExecutor(logger.LoggerMixin):
         pass
 
     @staticmethod
-    def _task_started(task):
-        events.start_task_signal.send(task)
+    def _task_started(ctx):
+        events.start_task_signal.send(ctx)
+        ctx.model.task.update(ctx.task)
 
     @staticmethod
-    def _task_failed(task, exception, traceback=None):
-        events.on_failure_task_signal.send(task, exception=exception, 
traceback=traceback)
+    def _task_failed(ctx, exception, traceback=None):
+        events.on_failure_task_signal.send(ctx, exception=exception, 
traceback=traceback)
+        ctx.model.task.update(ctx.task)
 
     @staticmethod
-    def _task_succeeded(task):
-        events.on_success_task_signal.send(task)
+    def _task_succeeded(ctx):
+        events.on_success_task_signal.send(ctx)
+        ctx.model.task.update(ctx.task)
 
 
 class StubTaskExecutor(BaseExecutor):                                          
                     # pylint: disable=abstract-method
-    def execute(self, task):
-        task.status = task.SUCCESS
+    def execute(self, ctx):
+        ctx.task.status = ctx.task.SUCCESS

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62fa985c/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py 
b/aria/orchestrator/workflows/executor/process.py
index 634f1f2..8518b33 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -113,17 +113,17 @@ class ProcessExecutor(base.BaseExecutor):
         self._server_socket.close()
         self._listener_thread.join(timeout=60)
 
-    def _execute(self, task):
+    def _execute(self, ctx):
         self._check_closed()
-        self._tasks[task.id] = task
+        self._tasks[ctx.task.id] = ctx
 
         # Temporary file used to pass arguments to the started subprocess
         file_descriptor, arguments_json_path = 
tempfile.mkstemp(prefix='executor-', suffix='.json')
         os.close(file_descriptor)
         with open(arguments_json_path, 'wb') as f:
-            f.write(pickle.dumps(self._create_arguments_dict(task)))
+            f.write(pickle.dumps(self._create_arguments_dict(ctx)))
 
-        env = self._construct_subprocess_env(task=task)
+        env = self._construct_subprocess_env(task=ctx.task)
         # Asynchronously start the operation in a subprocess
         subprocess.Popen(
             '{0} {1} {2}'.format(sys.executable, __file__, 
arguments_json_path),
@@ -137,13 +137,13 @@ class ProcessExecutor(base.BaseExecutor):
         if self._stopped:
             raise RuntimeError('Executor closed')
 
-    def _create_arguments_dict(self, task):
+    def _create_arguments_dict(self, ctx):
         return {
-            'task_id': task.id,
-            'function': task.function,
-            'operation_arguments': dict(arg.unwrapped for arg in 
task.arguments.values()),
+            'task_id': ctx.task.id,
+            'function': ctx.task.function,
+            'operation_arguments': dict(arg.unwrapped for arg in 
ctx.task.arguments.values()),
             'port': self._server_port,
-            'context': task.context.serialization_dict,
+            'context': ctx.serialization_dict,
         }
 
     def _construct_subprocess_env(self, task):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62fa985c/aria/orchestrator/workflows/executor/thread.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/thread.py 
b/aria/orchestrator/workflows/executor/thread.py
index a44499e..8c447b6 100644
--- a/aria/orchestrator/workflows/executor/thread.py
+++ b/aria/orchestrator/workflows/executor/thread.py
@@ -46,8 +46,8 @@ class ThreadExecutor(BaseExecutor):
             thread.start()
             self._pool.append(thread)
 
-    def _execute(self, task):
-        self._queue.put(task)
+    def _execute(self, ctx):
+        self._queue.put(ctx)
 
     def close(self):
         self._stopped = True
@@ -57,16 +57,15 @@ class ThreadExecutor(BaseExecutor):
     def _processor(self):
         while not self._stopped:
             try:
-                task = self._queue.get(timeout=1)
-                task = task.context.task
-                self._task_started(task)
+                ctx = self._queue.get(timeout=1)
+                self._task_started(ctx)
                 try:
-                    task_func = imports.load_attribute(task.function)
-                    arguments = dict(arg.unwrapped for arg in 
task.arguments.values())
-                    task_func(ctx=task.context, **arguments)
-                    self._task_succeeded(task)
+                    task_func = imports.load_attribute(ctx.task.function)
+                    arguments = dict(arg.unwrapped for arg in 
ctx.task.arguments.values())
+                    task_func(ctx=ctx, **arguments)
+                    self._task_succeeded(ctx)
                 except BaseException as e:
-                    self._task_failed(task,
+                    self._task_failed(ctx,
                                       exception=e,
                                       
traceback=exceptions.get_exception_as_string(*sys.exc_info()))
             # Daemon threads

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62fa985c/tests/orchestrator/context/__init__.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/__init__.py 
b/tests/orchestrator/context/__init__.py
index 4fde0a7..89a7b2c 100644
--- a/tests/orchestrator/context/__init__.py
+++ b/tests/orchestrator/context/__init__.py
@@ -23,7 +23,8 @@ def op_path(func, module_path=None):
     return '{0}.{1}'.format(module_path, func.__name__)
 
 
-def execute(workflow_func, workflow_context, executor):
+def execute(workflow_func, workflow_context, executor, executor_kwargs=None):
     graph = workflow_func(ctx=workflow_context)
-    eng = engine.Engine(executor=executor, workflow_context=workflow_context, 
tasks_graph=graph)
+    eng = engine.Engine(executor=executor, executor_kwargs=executor_kwargs,
+                        workflow_context=workflow_context, tasks_graph=graph)
     eng.execute()

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62fa985c/tests/orchestrator/context/test_operation.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_operation.py 
b/tests/orchestrator/context/test_operation.py
index 3dcfaa2..1cb8f65 100644
--- a/tests/orchestrator/context/test_operation.py
+++ b/tests/orchestrator/context/test_operation.py
@@ -263,15 +263,10 @@ def test_plugin_workdir(ctx, thread_executor, tmpdir):
 
 @pytest.fixture(params=[
     (thread.ThreadExecutor, {}),
-    (process.ProcessExecutor, {'python_path': [tests.ROOT_DIR]}),
+    # (process.ProcessExecutor, {'python_path': [tests.ROOT_DIR]}),
 ])
 def executor(request):
-    executor_cls, executor_kwargs = request.param
-    result = executor_cls(**executor_kwargs)
-    try:
-        yield result
-    finally:
-        result.close()
+    return request.param
 
 
 def test_node_operation_logging(ctx, executor):
@@ -305,12 +300,13 @@ def test_node_operation_logging(ctx, executor):
             )
         )
 
-    execute(workflow_func=basic_workflow, workflow_context=ctx, 
executor=executor)
+    execute(workflow_func=basic_workflow, workflow_context=ctx, 
executor=executor[0],
+            executor_kwargs=dict(ctx=ctx))
     _assert_loggins(ctx, arguments)
 
 
 def test_relationship_operation_logging(ctx, executor):
-    interface_name, operation_name = 
mock.operations.RELATIONSHIP_OPERATIONS_INSTALL[0]
+    interface_name, operation_name = 
mock.operations.RELATIONHIP_OPERATIONS_INSTALL[0]
 
     relationship = ctx.model.relationship.list()[0]
     arguments = {

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62fa985c/tests/orchestrator/execution_plugin/test_local.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_local.py 
b/tests/orchestrator/execution_plugin/test_local.py
index d792a57..8e8c6e0 100644
--- a/tests/orchestrator/execution_plugin/test_local.py
+++ b/tests/orchestrator/execution_plugin/test_local.py
@@ -508,9 +508,7 @@ if __name__ == '__main__':
 
     @pytest.fixture
     def executor(self):
-        result = process.ProcessExecutor()
-        yield result
-        result.close()
+        return process.ProcessExecutor
 
     @pytest.fixture
     def workflow_context(self, tmpdir):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62fa985c/tests/orchestrator/workflows/core/_test_engine.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/_test_engine.py 
b/tests/orchestrator/workflows/core/_test_engine.py
deleted file mode 100644
index 7ffb92a..0000000
--- a/tests/orchestrator/workflows/core/_test_engine.py
+++ /dev/null
@@ -1,519 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-import time
-import threading
-from datetime import datetime
-
-import pytest
-
-from aria.orchestrator import (
-    events,
-    workflow,
-    operation,
-)
-from aria.modeling import models
-from aria.orchestrator.workflows import (
-    api,
-    exceptions,
-)
-from aria.orchestrator.workflows.core import engine
-from aria.orchestrator.workflows.executor import thread
-
-from tests import mock, storage
-
-
-global_test_holder = {}
-
-
-class BaseTest(object):
-
-    @classmethod
-    def _execute(cls, workflow_func, workflow_context, executor, 
executor_kwargs=None):
-        eng = cls._engine(workflow_func=workflow_func,
-                          workflow_context=workflow_context,
-                          executor=executor,
-                          executor_kwargs=executor_kwargs)
-        eng.execute()
-        return eng
-
-    @staticmethod
-    def _engine(workflow_func, workflow_context, executor, 
executor_kwargs=None):
-        graph = workflow_func(ctx=workflow_context)
-        return engine.Engine(executor=executor,
-                             executor_kwargs=executor_kwargs,
-                             workflow_context=workflow_context,
-                             tasks_graph=graph)
-
-    @staticmethod
-    def _op(ctx,
-            func,
-            arguments=None,
-            max_attempts=None,
-            retry_interval=None,
-            ignore_failure=None):
-        node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
-        interface_name = 'aria.interfaces.lifecycle'
-        operation_kwargs = dict(function='{name}.{func.__name__}'.format(
-            name=__name__, func=func))
-        if arguments:
-            # the operation has to declare the arguments before those may be 
passed
-            operation_kwargs['arguments'] = arguments
-        operation_name = 'create'
-        interface = mock.models.create_interface(node.service, interface_name, 
operation_name,
-                                                 
operation_kwargs=operation_kwargs)
-        node.interfaces[interface.name] = interface
-
-        return api.task.OperationTask(
-            node,
-            interface_name='aria.interfaces.lifecycle',
-            operation_name=operation_name,
-            arguments=arguments,
-            max_attempts=max_attempts,
-            retry_interval=retry_interval,
-            ignore_failure=ignore_failure,
-        )
-
-    @pytest.fixture(autouse=True)
-    def globals_cleanup(self):
-        try:
-            yield
-        finally:
-            global_test_holder.clear()
-
-    @pytest.fixture(autouse=True)
-    def signals_registration(self, ):
-        def sent_task_handler(task, *args, **kwargs):
-            if task.stub_type is None:
-                calls = 
global_test_holder.setdefault('sent_task_signal_calls', 0)
-                global_test_holder['sent_task_signal_calls'] = calls + 1
-
-        def start_workflow_handler(workflow_context, *args, **kwargs):
-            workflow_context.states.append('start')
-
-        def success_workflow_handler(workflow_context, *args, **kwargs):
-            workflow_context.states.append('success')
-
-        def failure_workflow_handler(workflow_context, exception, *args, 
**kwargs):
-            workflow_context.states.append('failure')
-            workflow_context.exception = exception
-
-        def cancel_workflow_handler(workflow_context, *args, **kwargs):
-            workflow_context.states.append('cancel')
-
-        events.start_workflow_signal.connect(start_workflow_handler)
-        events.on_success_workflow_signal.connect(success_workflow_handler)
-        events.on_failure_workflow_signal.connect(failure_workflow_handler)
-        events.on_cancelled_workflow_signal.connect(cancel_workflow_handler)
-        events.sent_task_signal.connect(sent_task_handler)
-        try:
-            yield
-        finally:
-            events.start_workflow_signal.disconnect(start_workflow_handler)
-            
events.on_success_workflow_signal.disconnect(success_workflow_handler)
-            
events.on_failure_workflow_signal.disconnect(failure_workflow_handler)
-            
events.on_cancelled_workflow_signal.disconnect(cancel_workflow_handler)
-            events.sent_task_signal.disconnect(sent_task_handler)
-
-    @pytest.fixture
-    def executor(self):
-        return thread.ThreadExecutor
-
-    @pytest.fixture
-    def workflow_context(self, tmpdir):
-        workflow_context = mock.context.simple(str(tmpdir))
-        workflow_context.states = []
-        workflow_context.exception = None
-        yield workflow_context
-        storage.release_sqlite_storage(workflow_context.model)
-
-
-class TestEngine(BaseTest):
-
-    def test_empty_graph_execution(self, workflow_context, executor):
-        @workflow
-        def mock_workflow(**_):
-            pass
-        self._execute(workflow_func=mock_workflow,
-                      workflow_context=workflow_context,
-                      executor=executor)
-        assert workflow_context.states == ['start', 'success']
-        assert workflow_context.exception is None
-        assert 'sent_task_signal_calls' not in global_test_holder
-        execution = workflow_context.execution
-        assert execution.started_at <= execution.ended_at <= datetime.utcnow()
-        assert execution.error is None
-        assert execution.status == models.Execution.SUCCEEDED
-
-    def test_single_task_successful_execution(self, workflow_context, 
executor):
-        @workflow
-        def mock_workflow(ctx, graph):
-            graph.add_tasks(self._op(ctx, func=mock_success_task))
-        self._execute(
-            workflow_func=mock_workflow,
-            workflow_context=workflow_context,
-            executor=executor)
-        assert workflow_context.states == ['start', 'success']
-        assert workflow_context.exception is None
-        assert global_test_holder.get('sent_task_signal_calls') == 1
-
-    def test_single_task_failed_execution(self, workflow_context, executor):
-        @workflow
-        def mock_workflow(ctx, graph):
-            graph.add_tasks(self._op(ctx, func=mock_failed_task))
-        with pytest.raises(exceptions.ExecutorException):
-            self._execute(
-                workflow_func=mock_workflow,
-                workflow_context=workflow_context,
-                executor=executor)
-        assert workflow_context.states == ['start', 'failure']
-        assert isinstance(workflow_context.exception, 
exceptions.ExecutorException)
-        assert global_test_holder.get('sent_task_signal_calls') == 1
-        execution = workflow_context.execution
-        assert execution.started_at <= execution.ended_at <= datetime.utcnow()
-        assert execution.error is not None
-        assert execution.status == models.Execution.FAILED
-
-    def test_two_tasks_execution_order(self, workflow_context, executor):
-        @workflow
-        def mock_workflow(ctx, graph):
-            op1 = self._op(ctx, func=mock_ordered_task, arguments={'counter': 
1})
-            op2 = self._op(ctx, func=mock_ordered_task, arguments={'counter': 
2})
-            graph.sequence(op1, op2)
-        self._execute(
-            workflow_func=mock_workflow,
-            workflow_context=workflow_context,
-            executor=executor)
-        assert workflow_context.states == ['start', 'success']
-        assert workflow_context.exception is None
-        assert global_test_holder.get('invocations') == [1, 2]
-        assert global_test_holder.get('sent_task_signal_calls') == 2
-
-    def test_stub_and_subworkflow_execution(self, workflow_context, executor):
-        @workflow
-        def sub_workflow(ctx, graph):
-            op1 = self._op(ctx, func=mock_ordered_task, arguments={'counter': 
1})
-            op2 = api.task.StubTask()
-            op3 = self._op(ctx, func=mock_ordered_task, arguments={'counter': 
2})
-            graph.sequence(op1, op2, op3)
-
-        @workflow
-        def mock_workflow(ctx, graph):
-            graph.add_tasks(api.task.WorkflowTask(sub_workflow, ctx=ctx))
-        self._execute(workflow_func=mock_workflow,
-                      workflow_context=workflow_context,
-                      executor=executor)
-        assert workflow_context.states == ['start', 'success']
-        assert workflow_context.exception is None
-        assert global_test_holder.get('invocations') == [1, 2]
-        assert global_test_holder.get('sent_task_signal_calls') == 2
-
-
-class TestCancel(BaseTest):
-
-    def test_cancel_started_execution(self, workflow_context, executor):
-        number_of_tasks = 100
-
-        @workflow
-        def mock_workflow(ctx, graph):
-            operations = (
-                self._op(ctx, func=mock_sleep_task, 
arguments=dict(seconds=0.1))
-                for _ in range(number_of_tasks)
-            )
-            return graph.sequence(*operations)
-
-        eng = self._engine(workflow_func=mock_workflow,
-                           workflow_context=workflow_context,
-                           executor=executor)
-        t = threading.Thread(target=eng.execute)
-        t.start()
-        time.sleep(10)
-        eng.cancel_execution()
-        t.join(timeout=60) # we need to give this a *lot* of time because 
Travis can be *very* slow
-        assert not t.is_alive() # if join is timed out it will not raise an 
exception
-        assert workflow_context.states == ['start', 'cancel']
-        assert workflow_context.exception is None
-        invocations = global_test_holder.get('invocations', [])
-        assert 0 < len(invocations) < number_of_tasks
-        execution = workflow_context.execution
-        assert execution.started_at <= execution.ended_at <= datetime.utcnow()
-        assert execution.error is None
-        assert execution.status == models.Execution.CANCELLED
-
-    def test_cancel_pending_execution(self, workflow_context, executor):
-        @workflow
-        def mock_workflow(graph, **_):
-            return graph
-        eng = self._engine(workflow_func=mock_workflow,
-                           workflow_context=workflow_context,
-                           executor=executor)
-        eng.cancel_execution()
-        execution = workflow_context.execution
-        assert execution.status == models.Execution.CANCELLED
-
-
-class TestRetries(BaseTest):
-
-    def test_two_max_attempts_and_success_on_retry(self, workflow_context, 
executor):
-        @workflow
-        def mock_workflow(ctx, graph):
-            op = self._op(ctx, func=mock_conditional_failure_task,
-                          arguments={'failure_count': 1},
-                          max_attempts=2)
-            graph.add_tasks(op)
-        self._execute(
-            workflow_func=mock_workflow,
-            workflow_context=workflow_context,
-            executor=executor)
-        assert workflow_context.states == ['start', 'success']
-        assert workflow_context.exception is None
-        assert len(global_test_holder.get('invocations', [])) == 2
-        assert global_test_holder.get('sent_task_signal_calls') == 2
-
-    def test_two_max_attempts_and_failure_on_retry(self, workflow_context, 
executor):
-        @workflow
-        def mock_workflow(ctx, graph):
-            op = self._op(ctx, func=mock_conditional_failure_task,
-                          arguments={'failure_count': 2},
-                          max_attempts=2)
-            graph.add_tasks(op)
-        with pytest.raises(exceptions.ExecutorException):
-            self._execute(
-                workflow_func=mock_workflow,
-                workflow_context=workflow_context,
-                executor=executor)
-        assert workflow_context.states == ['start', 'failure']
-        assert isinstance(workflow_context.exception, 
exceptions.ExecutorException)
-        assert len(global_test_holder.get('invocations', [])) == 2
-        assert global_test_holder.get('sent_task_signal_calls') == 2
-
-    def test_three_max_attempts_and_success_on_first_retry(self, 
workflow_context, executor):
-        @workflow
-        def mock_workflow(ctx, graph):
-            op = self._op(ctx, func=mock_conditional_failure_task,
-                          arguments={'failure_count': 1},
-                          max_attempts=3)
-            graph.add_tasks(op)
-        self._execute(
-            workflow_func=mock_workflow,
-            workflow_context=workflow_context,
-            executor=executor)
-        assert workflow_context.states == ['start', 'success']
-        assert workflow_context.exception is None
-        assert len(global_test_holder.get('invocations', [])) == 2
-        assert global_test_holder.get('sent_task_signal_calls') == 2
-
-    def test_three_max_attempts_and_success_on_second_retry(self, 
workflow_context, executor):
-        @workflow
-        def mock_workflow(ctx, graph):
-            op = self._op(ctx, func=mock_conditional_failure_task,
-                          arguments={'failure_count': 2},
-                          max_attempts=3)
-            graph.add_tasks(op)
-        self._execute(
-            workflow_func=mock_workflow,
-            workflow_context=workflow_context,
-            executor=executor)
-        assert workflow_context.states == ['start', 'success']
-        assert workflow_context.exception is None
-        assert len(global_test_holder.get('invocations', [])) == 3
-        assert global_test_holder.get('sent_task_signal_calls') == 3
-
-    def test_infinite_retries(self, workflow_context, executor):
-        @workflow
-        def mock_workflow(ctx, graph):
-            op = self._op(ctx, func=mock_conditional_failure_task,
-                          arguments={'failure_count': 1},
-                          max_attempts=-1)
-            graph.add_tasks(op)
-        self._execute(
-            workflow_func=mock_workflow,
-            workflow_context=workflow_context,
-            executor=executor)
-        assert workflow_context.states == ['start', 'success']
-        assert workflow_context.exception is None
-        assert len(global_test_holder.get('invocations', [])) == 2
-        assert global_test_holder.get('sent_task_signal_calls') == 2
-
-    def test_retry_interval_float(self, workflow_context, executor):
-        self._test_retry_interval(retry_interval=0.3,
-                                  workflow_context=workflow_context,
-                                  executor=executor)
-
-    def test_retry_interval_int(self, workflow_context, executor):
-        self._test_retry_interval(retry_interval=1,
-                                  workflow_context=workflow_context,
-                                  executor=executor)
-
-    def _test_retry_interval(self, retry_interval, workflow_context, executor):
-        @workflow
-        def mock_workflow(ctx, graph):
-            op = self._op(ctx, func=mock_conditional_failure_task,
-                          arguments={'failure_count': 1},
-                          max_attempts=2,
-                          retry_interval=retry_interval)
-            graph.add_tasks(op)
-        self._execute(
-            workflow_func=mock_workflow,
-            workflow_context=workflow_context,
-            executor=executor)
-        assert workflow_context.states == ['start', 'success']
-        assert workflow_context.exception is None
-        invocations = global_test_holder.get('invocations', [])
-        assert len(invocations) == 2
-        invocation1, invocation2 = invocations
-        assert invocation2 - invocation1 >= retry_interval
-        assert global_test_holder.get('sent_task_signal_calls') == 2
-
-    def test_ignore_failure(self, workflow_context, executor):
-        @workflow
-        def mock_workflow(ctx, graph):
-            op = self._op(ctx, func=mock_conditional_failure_task,
-                          ignore_failure=True,
-                          arguments={'failure_count': 100},
-                          max_attempts=100)
-            graph.add_tasks(op)
-        self._execute(
-            workflow_func=mock_workflow,
-            workflow_context=workflow_context,
-            executor=executor)
-        assert workflow_context.states == ['start', 'success']
-        assert workflow_context.exception is None
-        invocations = global_test_holder.get('invocations', [])
-        assert len(invocations) == 1
-        assert global_test_holder.get('sent_task_signal_calls') == 1
-
-
-class TestTaskRetryAndAbort(BaseTest):
-    message = 'EXPECTED_ERROR'
-
-    def test_task_retry_default_interval(self, workflow_context, executor):
-        default_retry_interval = 0.1
-
-        @workflow
-        def mock_workflow(ctx, graph):
-            op = self._op(ctx, func=mock_task_retry,
-                          arguments={'message': self.message},
-                          retry_interval=default_retry_interval,
-                          max_attempts=2)
-            graph.add_tasks(op)
-        with pytest.raises(exceptions.ExecutorException):
-            self._execute(
-                workflow_func=mock_workflow,
-                workflow_context=workflow_context,
-                executor=executor)
-        assert workflow_context.states == ['start', 'failure']
-        assert isinstance(workflow_context.exception, 
exceptions.ExecutorException)
-        invocations = global_test_holder.get('invocations', [])
-        assert len(invocations) == 2
-        invocation1, invocation2 = invocations
-        assert invocation2 - invocation1 >= default_retry_interval
-        assert global_test_holder.get('sent_task_signal_calls') == 2
-
-    def test_task_retry_custom_interval(self, workflow_context, executor):
-        default_retry_interval = 100
-        custom_retry_interval = 0.1
-
-        @workflow
-        def mock_workflow(ctx, graph):
-            op = self._op(ctx, func=mock_task_retry,
-                          arguments={'message': self.message,
-                                     'retry_interval': custom_retry_interval},
-                          retry_interval=default_retry_interval,
-                          max_attempts=2)
-            graph.add_tasks(op)
-        execution_start = time.time()
-        with pytest.raises(exceptions.ExecutorException):
-            self._execute(
-                workflow_func=mock_workflow,
-                workflow_context=workflow_context,
-                executor=executor)
-        execution_end = time.time()
-        assert workflow_context.states == ['start', 'failure']
-        assert isinstance(workflow_context.exception, 
exceptions.ExecutorException)
-        invocations = global_test_holder.get('invocations', [])
-        assert len(invocations) == 2
-        assert (execution_end - execution_start) < default_retry_interval
-        assert global_test_holder.get('sent_task_signal_calls') == 2
-
-    def test_task_abort(self, workflow_context, executor):
-        @workflow
-        def mock_workflow(ctx, graph):
-            op = self._op(ctx, func=mock_task_abort,
-                          arguments={'message': self.message},
-                          retry_interval=100,
-                          max_attempts=100)
-            graph.add_tasks(op)
-        with pytest.raises(exceptions.ExecutorException):
-            self._execute(
-                workflow_func=mock_workflow,
-                workflow_context=workflow_context,
-                executor=executor)
-        assert workflow_context.states == ['start', 'failure']
-        assert isinstance(workflow_context.exception, 
exceptions.ExecutorException)
-        invocations = global_test_holder.get('invocations', [])
-        assert len(invocations) == 1
-        assert global_test_holder.get('sent_task_signal_calls') == 1
-
-
-@operation
-def mock_success_task(**_):
-    pass
-
-
-@operation
-def mock_failed_task(**_):
-    raise RuntimeError
-
-
-@operation
-def mock_ordered_task(counter, **_):
-    invocations = global_test_holder.setdefault('invocations', [])
-    invocations.append(counter)
-
-
-@operation
-def mock_conditional_failure_task(failure_count, **_):
-    invocations = global_test_holder.setdefault('invocations', [])
-    try:
-        if len(invocations) < failure_count:
-            raise RuntimeError
-    finally:
-        invocations.append(time.time())
-
-
-@operation
-def mock_sleep_task(seconds, **_):
-    _add_invocation_timestamp()
-    time.sleep(seconds)
-
-
-@operation
-def mock_task_retry(ctx, message, retry_interval=None, **_):
-    _add_invocation_timestamp()
-    retry_kwargs = {}
-    if retry_interval is not None:
-        retry_kwargs['retry_interval'] = retry_interval
-    ctx.task.retry(message, **retry_kwargs)
-
-
-@operation
-def mock_task_abort(ctx, message, **_):
-    _add_invocation_timestamp()
-    ctx.task.abort(message)
-
-
-def _add_invocation_timestamp():
-    invocations = global_test_holder.setdefault('invocations', [])
-    invocations.append(time.time())

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62fa985c/tests/orchestrator/workflows/core/test_engine.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_engine.py 
b/tests/orchestrator/workflows/core/test_engine.py
new file mode 100644
index 0000000..7ffb92a
--- /dev/null
+++ b/tests/orchestrator/workflows/core/test_engine.py
@@ -0,0 +1,519 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import time
+import threading
+from datetime import datetime
+
+import pytest
+
+from aria.orchestrator import (
+    events,
+    workflow,
+    operation,
+)
+from aria.modeling import models
+from aria.orchestrator.workflows import (
+    api,
+    exceptions,
+)
+from aria.orchestrator.workflows.core import engine
+from aria.orchestrator.workflows.executor import thread
+
+from tests import mock, storage
+
+
+global_test_holder = {}
+
+
+class BaseTest(object):
+
+    @classmethod
+    def _execute(cls, workflow_func, workflow_context, executor, 
executor_kwargs=None):
+        eng = cls._engine(workflow_func=workflow_func,
+                          workflow_context=workflow_context,
+                          executor=executor,
+                          executor_kwargs=executor_kwargs)
+        eng.execute()
+        return eng
+
+    @staticmethod
+    def _engine(workflow_func, workflow_context, executor, 
executor_kwargs=None):
+        graph = workflow_func(ctx=workflow_context)
+        return engine.Engine(executor=executor,
+                             executor_kwargs=executor_kwargs,
+                             workflow_context=workflow_context,
+                             tasks_graph=graph)
+
+    @staticmethod
+    def _op(ctx,
+            func,
+            arguments=None,
+            max_attempts=None,
+            retry_interval=None,
+            ignore_failure=None):
+        node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+        interface_name = 'aria.interfaces.lifecycle'
+        operation_kwargs = dict(function='{name}.{func.__name__}'.format(
+            name=__name__, func=func))
+        if arguments:
+            # the operation has to declare the arguments before those may be 
passed
+            operation_kwargs['arguments'] = arguments
+        operation_name = 'create'
+        interface = mock.models.create_interface(node.service, interface_name, 
operation_name,
+                                                 
operation_kwargs=operation_kwargs)
+        node.interfaces[interface.name] = interface
+
+        return api.task.OperationTask(
+            node,
+            interface_name='aria.interfaces.lifecycle',
+            operation_name=operation_name,
+            arguments=arguments,
+            max_attempts=max_attempts,
+            retry_interval=retry_interval,
+            ignore_failure=ignore_failure,
+        )
+
+    @pytest.fixture(autouse=True)
+    def globals_cleanup(self):
+        try:
+            yield
+        finally:
+            global_test_holder.clear()
+
+    @pytest.fixture(autouse=True)
+    def signals_registration(self, ):
+        def sent_task_handler(task, *args, **kwargs):
+            if task.stub_type is None:
+                calls = 
global_test_holder.setdefault('sent_task_signal_calls', 0)
+                global_test_holder['sent_task_signal_calls'] = calls + 1
+
+        def start_workflow_handler(workflow_context, *args, **kwargs):
+            workflow_context.states.append('start')
+
+        def success_workflow_handler(workflow_context, *args, **kwargs):
+            workflow_context.states.append('success')
+
+        def failure_workflow_handler(workflow_context, exception, *args, 
**kwargs):
+            workflow_context.states.append('failure')
+            workflow_context.exception = exception
+
+        def cancel_workflow_handler(workflow_context, *args, **kwargs):
+            workflow_context.states.append('cancel')
+
+        events.start_workflow_signal.connect(start_workflow_handler)
+        events.on_success_workflow_signal.connect(success_workflow_handler)
+        events.on_failure_workflow_signal.connect(failure_workflow_handler)
+        events.on_cancelled_workflow_signal.connect(cancel_workflow_handler)
+        events.sent_task_signal.connect(sent_task_handler)
+        try:
+            yield
+        finally:
+            events.start_workflow_signal.disconnect(start_workflow_handler)
+            
events.on_success_workflow_signal.disconnect(success_workflow_handler)
+            
events.on_failure_workflow_signal.disconnect(failure_workflow_handler)
+            
events.on_cancelled_workflow_signal.disconnect(cancel_workflow_handler)
+            events.sent_task_signal.disconnect(sent_task_handler)
+
+    @pytest.fixture
+    def executor(self):
+        return thread.ThreadExecutor
+
+    @pytest.fixture
+    def workflow_context(self, tmpdir):
+        workflow_context = mock.context.simple(str(tmpdir))
+        workflow_context.states = []
+        workflow_context.exception = None
+        yield workflow_context
+        storage.release_sqlite_storage(workflow_context.model)
+
+
+class TestEngine(BaseTest):
+
+    def test_empty_graph_execution(self, workflow_context, executor):
+        @workflow
+        def mock_workflow(**_):
+            pass
+        self._execute(workflow_func=mock_workflow,
+                      workflow_context=workflow_context,
+                      executor=executor)
+        assert workflow_context.states == ['start', 'success']
+        assert workflow_context.exception is None
+        assert 'sent_task_signal_calls' not in global_test_holder
+        execution = workflow_context.execution
+        assert execution.started_at <= execution.ended_at <= datetime.utcnow()
+        assert execution.error is None
+        assert execution.status == models.Execution.SUCCEEDED
+
+    def test_single_task_successful_execution(self, workflow_context, 
executor):
+        @workflow
+        def mock_workflow(ctx, graph):
+            graph.add_tasks(self._op(ctx, func=mock_success_task))
+        self._execute(
+            workflow_func=mock_workflow,
+            workflow_context=workflow_context,
+            executor=executor)
+        assert workflow_context.states == ['start', 'success']
+        assert workflow_context.exception is None
+        assert global_test_holder.get('sent_task_signal_calls') == 1
+
+    def test_single_task_failed_execution(self, workflow_context, executor):
+        @workflow
+        def mock_workflow(ctx, graph):
+            graph.add_tasks(self._op(ctx, func=mock_failed_task))
+        with pytest.raises(exceptions.ExecutorException):
+            self._execute(
+                workflow_func=mock_workflow,
+                workflow_context=workflow_context,
+                executor=executor)
+        assert workflow_context.states == ['start', 'failure']
+        assert isinstance(workflow_context.exception, 
exceptions.ExecutorException)
+        assert global_test_holder.get('sent_task_signal_calls') == 1
+        execution = workflow_context.execution
+        assert execution.started_at <= execution.ended_at <= datetime.utcnow()
+        assert execution.error is not None
+        assert execution.status == models.Execution.FAILED
+
+    def test_two_tasks_execution_order(self, workflow_context, executor):
+        @workflow
+        def mock_workflow(ctx, graph):
+            op1 = self._op(ctx, func=mock_ordered_task, arguments={'counter': 
1})
+            op2 = self._op(ctx, func=mock_ordered_task, arguments={'counter': 
2})
+            graph.sequence(op1, op2)
+        self._execute(
+            workflow_func=mock_workflow,
+            workflow_context=workflow_context,
+            executor=executor)
+        assert workflow_context.states == ['start', 'success']
+        assert workflow_context.exception is None
+        assert global_test_holder.get('invocations') == [1, 2]
+        assert global_test_holder.get('sent_task_signal_calls') == 2
+
+    def test_stub_and_subworkflow_execution(self, workflow_context, executor):
+        @workflow
+        def sub_workflow(ctx, graph):
+            op1 = self._op(ctx, func=mock_ordered_task, arguments={'counter': 
1})
+            op2 = api.task.StubTask()
+            op3 = self._op(ctx, func=mock_ordered_task, arguments={'counter': 
2})
+            graph.sequence(op1, op2, op3)
+
+        @workflow
+        def mock_workflow(ctx, graph):
+            graph.add_tasks(api.task.WorkflowTask(sub_workflow, ctx=ctx))
+        self._execute(workflow_func=mock_workflow,
+                      workflow_context=workflow_context,
+                      executor=executor)
+        assert workflow_context.states == ['start', 'success']
+        assert workflow_context.exception is None
+        assert global_test_holder.get('invocations') == [1, 2]
+        assert global_test_holder.get('sent_task_signal_calls') == 2
+
+
+class TestCancel(BaseTest):
+
+    def test_cancel_started_execution(self, workflow_context, executor):
+        number_of_tasks = 100
+
+        @workflow
+        def mock_workflow(ctx, graph):
+            operations = (
+                self._op(ctx, func=mock_sleep_task, 
arguments=dict(seconds=0.1))
+                for _ in range(number_of_tasks)
+            )
+            return graph.sequence(*operations)
+
+        eng = self._engine(workflow_func=mock_workflow,
+                           workflow_context=workflow_context,
+                           executor=executor)
+        t = threading.Thread(target=eng.execute)
+        t.start()
+        time.sleep(10)
+        eng.cancel_execution()
+        t.join(timeout=60) # we need to give this a *lot* of time because 
Travis can be *very* slow
+        assert not t.is_alive() # if join is timed out it will not raise an 
exception
+        assert workflow_context.states == ['start', 'cancel']
+        assert workflow_context.exception is None
+        invocations = global_test_holder.get('invocations', [])
+        assert 0 < len(invocations) < number_of_tasks
+        execution = workflow_context.execution
+        assert execution.started_at <= execution.ended_at <= datetime.utcnow()
+        assert execution.error is None
+        assert execution.status == models.Execution.CANCELLED
+
+    def test_cancel_pending_execution(self, workflow_context, executor):
+        @workflow
+        def mock_workflow(graph, **_):
+            return graph
+        eng = self._engine(workflow_func=mock_workflow,
+                           workflow_context=workflow_context,
+                           executor=executor)
+        eng.cancel_execution()
+        execution = workflow_context.execution
+        assert execution.status == models.Execution.CANCELLED
+
+
+class TestRetries(BaseTest):
+
+    def test_two_max_attempts_and_success_on_retry(self, workflow_context, 
executor):
+        @workflow
+        def mock_workflow(ctx, graph):
+            op = self._op(ctx, func=mock_conditional_failure_task,
+                          arguments={'failure_count': 1},
+                          max_attempts=2)
+            graph.add_tasks(op)
+        self._execute(
+            workflow_func=mock_workflow,
+            workflow_context=workflow_context,
+            executor=executor)
+        assert workflow_context.states == ['start', 'success']
+        assert workflow_context.exception is None
+        assert len(global_test_holder.get('invocations', [])) == 2
+        assert global_test_holder.get('sent_task_signal_calls') == 2
+
+    def test_two_max_attempts_and_failure_on_retry(self, workflow_context, 
executor):
+        @workflow
+        def mock_workflow(ctx, graph):
+            op = self._op(ctx, func=mock_conditional_failure_task,
+                          arguments={'failure_count': 2},
+                          max_attempts=2)
+            graph.add_tasks(op)
+        with pytest.raises(exceptions.ExecutorException):
+            self._execute(
+                workflow_func=mock_workflow,
+                workflow_context=workflow_context,
+                executor=executor)
+        assert workflow_context.states == ['start', 'failure']
+        assert isinstance(workflow_context.exception, 
exceptions.ExecutorException)
+        assert len(global_test_holder.get('invocations', [])) == 2
+        assert global_test_holder.get('sent_task_signal_calls') == 2
+
+    def test_three_max_attempts_and_success_on_first_retry(self, 
workflow_context, executor):
+        @workflow
+        def mock_workflow(ctx, graph):
+            op = self._op(ctx, func=mock_conditional_failure_task,
+                          arguments={'failure_count': 1},
+                          max_attempts=3)
+            graph.add_tasks(op)
+        self._execute(
+            workflow_func=mock_workflow,
+            workflow_context=workflow_context,
+            executor=executor)
+        assert workflow_context.states == ['start', 'success']
+        assert workflow_context.exception is None
+        assert len(global_test_holder.get('invocations', [])) == 2
+        assert global_test_holder.get('sent_task_signal_calls') == 2
+
+    def test_three_max_attempts_and_success_on_second_retry(self, 
workflow_context, executor):
+        @workflow
+        def mock_workflow(ctx, graph):
+            op = self._op(ctx, func=mock_conditional_failure_task,
+                          arguments={'failure_count': 2},
+                          max_attempts=3)
+            graph.add_tasks(op)
+        self._execute(
+            workflow_func=mock_workflow,
+            workflow_context=workflow_context,
+            executor=executor)
+        assert workflow_context.states == ['start', 'success']
+        assert workflow_context.exception is None
+        assert len(global_test_holder.get('invocations', [])) == 3
+        assert global_test_holder.get('sent_task_signal_calls') == 3
+
+    def test_infinite_retries(self, workflow_context, executor):
+        @workflow
+        def mock_workflow(ctx, graph):
+            op = self._op(ctx, func=mock_conditional_failure_task,
+                          arguments={'failure_count': 1},
+                          max_attempts=-1)
+            graph.add_tasks(op)
+        self._execute(
+            workflow_func=mock_workflow,
+            workflow_context=workflow_context,
+            executor=executor)
+        assert workflow_context.states == ['start', 'success']
+        assert workflow_context.exception is None
+        assert len(global_test_holder.get('invocations', [])) == 2
+        assert global_test_holder.get('sent_task_signal_calls') == 2
+
+    def test_retry_interval_float(self, workflow_context, executor):
+        self._test_retry_interval(retry_interval=0.3,
+                                  workflow_context=workflow_context,
+                                  executor=executor)
+
+    def test_retry_interval_int(self, workflow_context, executor):
+        self._test_retry_interval(retry_interval=1,
+                                  workflow_context=workflow_context,
+                                  executor=executor)
+
+    def _test_retry_interval(self, retry_interval, workflow_context, executor):
+        @workflow
+        def mock_workflow(ctx, graph):
+            op = self._op(ctx, func=mock_conditional_failure_task,
+                          arguments={'failure_count': 1},
+                          max_attempts=2,
+                          retry_interval=retry_interval)
+            graph.add_tasks(op)
+        self._execute(
+            workflow_func=mock_workflow,
+            workflow_context=workflow_context,
+            executor=executor)
+        assert workflow_context.states == ['start', 'success']
+        assert workflow_context.exception is None
+        invocations = global_test_holder.get('invocations', [])
+        assert len(invocations) == 2
+        invocation1, invocation2 = invocations
+        assert invocation2 - invocation1 >= retry_interval
+        assert global_test_holder.get('sent_task_signal_calls') == 2
+
+    def test_ignore_failure(self, workflow_context, executor):
+        @workflow
+        def mock_workflow(ctx, graph):
+            op = self._op(ctx, func=mock_conditional_failure_task,
+                          ignore_failure=True,
+                          arguments={'failure_count': 100},
+                          max_attempts=100)
+            graph.add_tasks(op)
+        self._execute(
+            workflow_func=mock_workflow,
+            workflow_context=workflow_context,
+            executor=executor)
+        assert workflow_context.states == ['start', 'success']
+        assert workflow_context.exception is None
+        invocations = global_test_holder.get('invocations', [])
+        assert len(invocations) == 1
+        assert global_test_holder.get('sent_task_signal_calls') == 1
+
+
+class TestTaskRetryAndAbort(BaseTest):
+    message = 'EXPECTED_ERROR'
+
+    def test_task_retry_default_interval(self, workflow_context, executor):
+        default_retry_interval = 0.1
+
+        @workflow
+        def mock_workflow(ctx, graph):
+            op = self._op(ctx, func=mock_task_retry,
+                          arguments={'message': self.message},
+                          retry_interval=default_retry_interval,
+                          max_attempts=2)
+            graph.add_tasks(op)
+        with pytest.raises(exceptions.ExecutorException):
+            self._execute(
+                workflow_func=mock_workflow,
+                workflow_context=workflow_context,
+                executor=executor)
+        assert workflow_context.states == ['start', 'failure']
+        assert isinstance(workflow_context.exception, 
exceptions.ExecutorException)
+        invocations = global_test_holder.get('invocations', [])
+        assert len(invocations) == 2
+        invocation1, invocation2 = invocations
+        assert invocation2 - invocation1 >= default_retry_interval
+        assert global_test_holder.get('sent_task_signal_calls') == 2
+
+    def test_task_retry_custom_interval(self, workflow_context, executor):
+        default_retry_interval = 100
+        custom_retry_interval = 0.1
+
+        @workflow
+        def mock_workflow(ctx, graph):
+            op = self._op(ctx, func=mock_task_retry,
+                          arguments={'message': self.message,
+                                     'retry_interval': custom_retry_interval},
+                          retry_interval=default_retry_interval,
+                          max_attempts=2)
+            graph.add_tasks(op)
+        execution_start = time.time()
+        with pytest.raises(exceptions.ExecutorException):
+            self._execute(
+                workflow_func=mock_workflow,
+                workflow_context=workflow_context,
+                executor=executor)
+        execution_end = time.time()
+        assert workflow_context.states == ['start', 'failure']
+        assert isinstance(workflow_context.exception, 
exceptions.ExecutorException)
+        invocations = global_test_holder.get('invocations', [])
+        assert len(invocations) == 2
+        assert (execution_end - execution_start) < default_retry_interval
+        assert global_test_holder.get('sent_task_signal_calls') == 2
+
+    def test_task_abort(self, workflow_context, executor):
+        @workflow
+        def mock_workflow(ctx, graph):
+            op = self._op(ctx, func=mock_task_abort,
+                          arguments={'message': self.message},
+                          retry_interval=100,
+                          max_attempts=100)
+            graph.add_tasks(op)
+        with pytest.raises(exceptions.ExecutorException):
+            self._execute(
+                workflow_func=mock_workflow,
+                workflow_context=workflow_context,
+                executor=executor)
+        assert workflow_context.states == ['start', 'failure']
+        assert isinstance(workflow_context.exception, 
exceptions.ExecutorException)
+        invocations = global_test_holder.get('invocations', [])
+        assert len(invocations) == 1
+        assert global_test_holder.get('sent_task_signal_calls') == 1
+
+
+@operation
+def mock_success_task(**_):
+    pass
+
+
+@operation
+def mock_failed_task(**_):
+    raise RuntimeError
+
+
+@operation
+def mock_ordered_task(counter, **_):
+    invocations = global_test_holder.setdefault('invocations', [])
+    invocations.append(counter)
+
+
+@operation
+def mock_conditional_failure_task(failure_count, **_):
+    invocations = global_test_holder.setdefault('invocations', [])
+    try:
+        if len(invocations) < failure_count:
+            raise RuntimeError
+    finally:
+        invocations.append(time.time())
+
+
+@operation
+def mock_sleep_task(seconds, **_):
+    _add_invocation_timestamp()
+    time.sleep(seconds)
+
+
+@operation
+def mock_task_retry(ctx, message, retry_interval=None, **_):
+    _add_invocation_timestamp()
+    retry_kwargs = {}
+    if retry_interval is not None:
+        retry_kwargs['retry_interval'] = retry_interval
+    ctx.task.retry(message, **retry_kwargs)
+
+
+@operation
+def mock_task_abort(ctx, message, **_):
+    _add_invocation_timestamp()
+    ctx.task.abort(message)
+
+
+def _add_invocation_timestamp():
+    invocations = global_test_holder.setdefault('invocations', [])
+    invocations.append(time.time())

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62fa985c/tests/orchestrator/workflows/core/test_events.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_events.py 
b/tests/orchestrator/workflows/core/test_events.py
index 6d542e9..d63a8ef 100644
--- a/tests/orchestrator/workflows/core/test_events.py
+++ b/tests/orchestrator/workflows/core/test_events.py
@@ -113,7 +113,7 @@ def run_operation_on_node(ctx, op_name, interface_name):
         
operation_kwargs=dict(function='{name}.{func.__name__}'.format(name=__name__, 
func=func)))
     node.interfaces[interface.name] = interface
 
-    eng = engine.Engine(executor=ThreadExecutor(),
+    eng = engine.Engine(executor=ThreadExecutor,
                         workflow_context=ctx,
                         tasks_graph=single_operation_workflow(ctx=ctx,  # 
pylint: disable=no-value-for-parameter
                                                               node=node,

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62fa985c/tests/orchestrator/workflows/executor/__init__.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/__init__.py 
b/tests/orchestrator/workflows/executor/__init__.py
index ac6d325..07fb8ad 100644
--- a/tests/orchestrator/workflows/executor/__init__.py
+++ b/tests/orchestrator/workflows/executor/__init__.py
@@ -34,7 +34,7 @@ class MockTask(object):
         self.exception = None
         self.id = str(uuid.uuid4())
         self.logger = logging.getLogger()
-        self.context = MockContext(storage)
+        self.context = MockContext(self.id, storage)
         self.attempts_count = 1
         self.max_attempts = 1
         self.ignore_failure = False
@@ -46,34 +46,32 @@ class MockTask(object):
         for state in models.Task.STATES:
             setattr(self, state.upper(), state)
 
-    @contextmanager
-    def _update(self):
-        yield self
-
 
 class MockContext(object):
 
-    def __init__(self, storage=None):
+    def __init__(self, task_id, storage=None):
         self.logger = logging.getLogger('mock_logger')
-        self.task = type('SubprocessMockTask', (object, ), {'plugin': None})
+        self.task_id = task_id
         self.model = storage
 
     @property
     def serialization_dict(self):
         if self.model:
-            return {'context': self.model.serialization_dict, 'context_cls': 
self.__class__}
+            context = self.model.serialization_dict
+            context['task_id'] = self.task_id
+            return {'context': context, 'context_cls': self.__class__}
         else:
-            return {'context_cls': self.__class__, 'context': {}}
+            return {'context_cls': self.__class__, 'context': {'task': 
self.task_id}}
 
     def __getattr__(self, item):
         return None
 
     @classmethod
-    def instantiate_from_dict(cls, **kwargs):
+    def instantiate_from_dict(cls, task_id, **kwargs):
         if kwargs:
-            return cls(storage=aria.application_model_storage(**kwargs))
+            return cls(task_id=task_id, 
storage=aria.application_model_storage(**kwargs))
         else:
-            return cls()
+            return cls(task=task_id)
 
     @staticmethod
     def close():

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62fa985c/tests/orchestrator/workflows/executor/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_executor.py 
b/tests/orchestrator/workflows/executor/test_executor.py
index 3079c60..cfb6975 100644
--- a/tests/orchestrator/workflows/executor/test_executor.py
+++ b/tests/orchestrator/workflows/executor/test_executor.py
@@ -103,7 +103,7 @@ def storage(tmpdir):
 
 @pytest.fixture(params=[
     (thread.ThreadExecutor, {'pool_size': 1}),
-    (thread.ThreadExecutor, {'pool_size': 2}),
+    # (thread.ThreadExecutor, {'pool_size': 2}),
     # subprocess needs to load a tests module so we explicitly add the root 
directory as if
     # the project has been installed in editable mode
     # (celery.CeleryExecutor, {'app': app})

Reply via email to