Repository: incubator-ariatosca
Updated Branches:
  refs/heads/wf-executor 487d5d2ff -> 6641423f8 (forced update)


Add basic executor mechanism


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/6641423f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/6641423f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/6641423f

Branch: refs/heads/wf-executor
Commit: 6641423f87d1766942f12ef5ea9ac0c62600beae
Parents: f2f4131
Author: Dan Kilman <dankil...@gmail.com>
Authored: Thu Oct 13 15:44:58 2016 +0300
Committer: Dan Kilman <dankil...@gmail.com>
Committed: Wed Oct 19 12:05:32 2016 +0300

----------------------------------------------------------------------
 aria/cli/commands.py                            |   6 +-
 aria/events/__init__.py                         |  12 +-
 aria/events/builtin_event_handler.py            |  84 +++++++++
 aria/events/builtin_event_handlers.py           |  44 -----
 aria/events/workflow_engine_event_handler.py    |  70 +++-----
 aria/storage/models.py                          |   7 +-
 aria/tools/module.py                            |  29 ++++
 aria/workflows/builtin/scale.py                 |   2 +-
 aria/workflows/core/__init__.py                 |  14 ++
 aria/workflows/core/engine.py                   |  80 +++++++++
 aria/workflows/core/executor.py                 | 169 +++++++++++++++++++
 aria/workflows/core/tasks.py                    |  61 +++++++
 aria/workflows/core/translation.py              |  83 +++++++++
 aria/workflows/engine/__init__.py               |  14 --
 aria/workflows/engine/engine.py                 | 166 ------------------
 aria/workflows/engine/executor.py               |  87 ----------
 aria/workflows/engine/tasks.py                  |  61 -------
 aria/workflows/engine/translation.py            |  83 ---------
 tests/workflows/test_executor.py                | 118 +++++++++++++
 .../test_task_graph_into_exececution_graph.py   |   2 +-
 20 files changed, 676 insertions(+), 516 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/6641423f/aria/cli/commands.py
----------------------------------------------------------------------
diff --git a/aria/cli/commands.py b/aria/cli/commands.py
index 9fa4911..ab72435 100644
--- a/aria/cli/commands.py
+++ b/aria/cli/commands.py
@@ -26,8 +26,8 @@ from aria.logger import LoggerMixin
 from aria.storage import FileSystemModelDriver, FileSystemResourceDriver
 from aria.tools.application import StorageManager
 from aria.contexts import WorkflowContext
-from aria.workflows.engine.engine import Engine
-from aria.workflows.engine.executor import LocalThreadExecutor
+from aria.workflows.core.engine import Engine
+from aria.workflows.core.executor import ThreadExecutor
 
 from .storage import (
     local_resource_storage,
@@ -225,7 +225,7 @@ class ExecuteCommand(BaseCommand):
         )
         workflow_function = self._load_workflow_handler(workflow['operation'])
         tasks_graph = workflow_function(workflow_context, 
**workflow_context.parameters)
-        executor = LocalThreadExecutor()
+        executor = ThreadExecutor()
         workflow_engine = Engine(executor=executor,
                                  workflow_context=workflow_context,
                                  tasks_graph=tasks_graph)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/6641423f/aria/events/__init__.py
----------------------------------------------------------------------
diff --git a/aria/events/__init__.py b/aria/events/__init__.py
index 70e7e03..c9d7b20 100644
--- a/aria/events/__init__.py
+++ b/aria/events/__init__.py
@@ -20,25 +20,15 @@ from blinker import signal
 from ..tools.plugin import plugin_installer
 
 
-# workflow engine default signals:
+# workflow engine task signals:
 start_task_signal = signal('start_task_signal')
-end_task_signal = signal('end_task_signal')
 on_success_task_signal = signal('success_task_signal')
 on_failure_task_signal = signal('failure_task_signal')
 
 # workflow engine workflow signals:
 start_workflow_signal = signal('start_workflow_signal')
-end_workflow_signal = signal('end_workflow_signal')
 on_success_workflow_signal = signal('on_success_workflow_signal')
 on_failure_workflow_signal = signal('on_failure_workflow_signal')
-start_sub_workflow_signal = signal('start_sub_workflow_signal')
-end_sub_workflow_signal = signal('end_sub_workflow_signal')
-
-# workflow engine operation signals:
-start_operation_signal = signal('start_operation_signal')
-end_operation_signal = signal('end_operation_signal')
-on_success_operation_signal = signal('on_success_operation_signal')
-on_failure_operation_signal = signal('on_failure_operation_signal')
 
 plugin_installer(
     path=os.path.dirname(os.path.realpath(__file__)),

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/6641423f/aria/events/builtin_event_handler.py
----------------------------------------------------------------------
diff --git a/aria/events/builtin_event_handler.py 
b/aria/events/builtin_event_handler.py
new file mode 100644
index 0000000..404cc01
--- /dev/null
+++ b/aria/events/builtin_event_handler.py
@@ -0,0 +1,84 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from datetime import datetime
+
+from . import (
+    start_workflow_signal,
+    on_success_workflow_signal,
+    on_failure_workflow_signal,
+    start_task_signal,
+    on_success_task_signal,
+    on_failure_task_signal,
+)
+
+
+@start_task_signal.connect
+def _task_started(task, *args, **kwargs):
+    operation_context = task.context
+    operation = operation_context.operation
+    operation.started_at = datetime.utcnow()
+    operation.status = operation.STARTED
+    operation_context.operation = operation
+
+
+@on_failure_task_signal.connect
+def _task_failed(task, *args, **kwargs):
+    operation_context = task.context
+    operation = operation_context.operation
+    operation.ended_at = datetime.utcnow()
+    operation.status = operation.FAILED
+    operation_context.operation = operation
+
+
+@on_success_task_signal.connect
+def _task_succeeded(task, *args, **kwargs):
+    operation_context = task.context
+    operation = operation_context.operation
+    operation.ended_at = datetime.utcnow()
+    operation.status = operation.SUCCESS
+    operation_context.operation = operation
+
+
+@start_workflow_signal.connect
+def _workflow_started(workflow_context, *args, **kwargs):
+    Execution = workflow_context.storage.execution.model_cls
+    execution = Execution(
+        id=workflow_context.execution_id,
+        deployment_id=workflow_context.deployment_id,
+        workflow_id=workflow_context.workflow_id,
+        blueprint_id=workflow_context.blueprint_id,
+        status=Execution.PENDING,
+        started_at=datetime.utcnow(),
+        parameters=workflow_context.parameters,
+    )
+    workflow_context.execution = execution
+
+
+@on_failure_workflow_signal.connect
+def _workflow_failed(workflow_context, exception, *args, **kwargs):
+    execution = workflow_context.execution
+    execution.error = str(exception)
+    execution.status = execution.FAILED
+    execution.ended_at = datetime.utcnow(),
+    workflow_context.execution = execution
+
+
+@on_success_workflow_signal.connect
+def _workflow_succeeded(workflow_context, *args, **kwargs):
+    execution = workflow_context.execution
+    execution.status = execution.TERMINATED
+    execution.ended_at = datetime.utcnow(),
+    workflow_context.execution = execution

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/6641423f/aria/events/builtin_event_handlers.py
----------------------------------------------------------------------
diff --git a/aria/events/builtin_event_handlers.py 
b/aria/events/builtin_event_handlers.py
deleted file mode 100644
index 59f59c1..0000000
--- a/aria/events/builtin_event_handlers.py
+++ /dev/null
@@ -1,44 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ..storage.models import NodeInstance
-from . import start_operation_signal
-
-
-class _OperationToNodeInstanceState(dict):
-    def __missing__(self, key):
-        for cached_key, value in self.items():
-            if key.startswith(cached_key):
-                return value
-        raise KeyError(key)
-
-_operation_to_node_instance_state = _OperationToNodeInstanceState({
-    'cloudify.interfaces.lifecycle.create': NodeInstance.INITIALIZING,
-    'cloudify.interfaces.lifecycle.configure': NodeInstance.CONFIGURING,
-    'cloudify.interfaces.lifecycle.start': NodeInstance.STARTING,
-    'cloudify.interfaces.lifecycle.stop': NodeInstance.STOPPING,
-    'cloudify.interfaces.lifecycle.delete': NodeInstance.DELETING
-})
-
-
-@start_operation_signal.connect
-def _update_node_instance_state(sender, **kwargs):
-    try:
-        next_state = _operation_to_node_instance_state[sender.task_name]
-    except KeyError:
-        return
-    node_instance = sender.context.node_instance
-    node_instance.state = next_state
-    sender.context.storage.node_instance.store(node_instance)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/6641423f/aria/events/workflow_engine_event_handler.py
----------------------------------------------------------------------
diff --git a/aria/events/workflow_engine_event_handler.py 
b/aria/events/workflow_engine_event_handler.py
index 59bed99..6916206 100644
--- a/aria/events/workflow_engine_event_handler.py
+++ b/aria/events/workflow_engine_event_handler.py
@@ -14,61 +14,47 @@
 # limitations under the License.
 
 from . import (
-    start_operation_signal,
-    end_operation_signal,
-    on_success_operation_signal,
-    on_failure_operation_signal,
+    start_task_signal,
+    on_success_task_signal,
+    on_failure_task_signal,
     start_workflow_signal,
-    end_workflow_signal,
-    start_sub_workflow_signal,
-    end_sub_workflow_signal,
+    on_success_workflow_signal,
+    on_failure_workflow_signal
 )
 
 
-@start_operation_signal.connect
-def start_operation_handler(sender, **kwargs):
-    sender.context.logger.debug(
-        'Event - starting operation: {sender.task_name}'.format(sender=sender))
+@start_task_signal.connect
+def start_task_handler(task, **kwargs):
+    task.logger.debug(
+        'Event: Starting task: {task.name}'.format(task=task))
 
 
-@end_operation_signal.connect
-def end_operation_handler(sender, **kwargs):
-    sender.context.logger.debug(
-        'Event - finished operation: {sender.task_name}'.format(sender=sender))
+@on_success_task_signal.connect
+def success_task_handler(task, **kwargs):
+    task.logger.debug(
+        'Event: Task success: {task.name}'.format(task=task))
 
 
-@on_success_operation_signal.connect
-def success_operation_handler(sender, **kwargs):
-    sender.context.logger.debug(
-        'Event - operation success: {sender.task_name}'.format(sender=sender))
-
-
-@on_failure_operation_signal.connect
-def failure_operation_handler(sender, **kwargs):
-    sender.context.logger.error(
-        'Event - operation failure: {sender.task_name}'.format(sender=sender),
+@on_failure_task_signal.connect
+def failure_operation_handler(task, **kwargs):
+    task.logger.error(
+        'Event: Task failure: {task.name}'.format(task=task),
         exc_info=kwargs.get('exception', True))
 
 
 @start_workflow_signal.connect
-def start_workflow_handler(sender, **kwargs):
-    sender.context.logger.debug(
-        'Event - starting workflow: {sender.task_name}'.format(sender=sender))
-
-
-@end_workflow_signal.connect
-def end_workflow_handler(sender, **kwargs):
-    sender.context.logger.debug(
-        'Event - finished workflow: {sender.task_name}'.format(sender=sender))
+def start_workflow_handler(context, **kwargs):
+    context.logger.debug(
+        'Event: Starting workflow: {context.name}'.format(context=context))
 
 
-@start_sub_workflow_signal.connect
-def start_sub_workflow_handler(sender, **kwargs):
-    sender.context.logger.debug(
-        'Event - starting sub workflow: 
{sender.task_name}'.format(sender=sender))
+@on_failure_workflow_signal.connect
+def failure_workflow_handler(context, **kwargs):
+    context.logger.debug(
+        'Event: Workflow failure: {context.name}'.format(context=context))
 
 
-@end_sub_workflow_signal.connect
-def end_sub_workflow_handler(sender, **kwargs):
-    sender.context.logger.debug(
-        'Event - finished sub workflow: 
{sender.task_name}'.format(sender=sender))
+@on_success_workflow_signal.connect
+def success_workflow_handler(context, **kwargs):
+    context.logger.debug(
+        'Event: Workflow success: {context.name}'.format(context=context))

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/6641423f/aria/storage/models.py
----------------------------------------------------------------------
diff --git a/aria/storage/models.py b/aria/storage/models.py
index d3cb3f7..d96c74a 100644
--- a/aria/storage/models.py
+++ b/aria/storage/models.py
@@ -191,10 +191,11 @@ class Execution(Model):
     deployment_id = Field(type=basestring)
     workflow_id = Field(type=basestring)
     blueprint_id = Field(type=basestring)
-    created_at = Field(type=datetime)
-    error = Field()
+    started_at = Field(type=datetime)
+    ended_at = Field(type=datetime, default=None)
+    error = Field(type=basestring, default=None)
     parameters = Field()
-    is_system_workflow = Field(type=bool)
+    is_system_workflow = Field(type=bool, default=False)
 
 
 class Operation(Model):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/6641423f/aria/tools/module.py
----------------------------------------------------------------------
diff --git a/aria/tools/module.py b/aria/tools/module.py
new file mode 100644
index 0000000..535f7aa
--- /dev/null
+++ b/aria/tools/module.py
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import importlib
+
+
+def load_attribute(attribute_path):
+    module_name, attribute_name = attribute_path.rsplit('.', 1)
+    try:
+        module = importlib.import_module(module_name)
+        return getattr(module, attribute_name)
+    except ImportError:
+        # TODO: handle
+        raise
+    except AttributeError:
+        # TODO: handle
+        raise

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/6641423f/aria/workflows/builtin/scale.py
----------------------------------------------------------------------
diff --git a/aria/workflows/builtin/scale.py b/aria/workflows/builtin/scale.py
index e8788c9..c064840 100644
--- a/aria/workflows/builtin/scale.py
+++ b/aria/workflows/builtin/scale.py
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 from aria import workflow
-from aria.workflows.engine.engine import Engine
+from aria.workflows.core.engine import Engine
 from .install import install
 from .uninstall import uninstall
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/6641423f/aria/workflows/core/__init__.py
----------------------------------------------------------------------
diff --git a/aria/workflows/core/__init__.py b/aria/workflows/core/__init__.py
new file mode 100644
index 0000000..ae1e83e
--- /dev/null
+++ b/aria/workflows/core/__init__.py
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/6641423f/aria/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/workflows/core/engine.py b/aria/workflows/core/engine.py
new file mode 100644
index 0000000..7cc4781
--- /dev/null
+++ b/aria/workflows/core/engine.py
@@ -0,0 +1,80 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import time
+
+import networkx
+
+from aria import events, logger
+
+from . import translation
+
+
+class Engine(logger.LoggerMixin):
+
+    def __init__(self, executor, workflow_context, tasks_graph, **kwargs):
+        super(Engine, self).__init__(**kwargs)
+        self._workflow_context = workflow_context
+        self._tasks_graph = tasks_graph
+        self._execution_graph = networkx.DiGraph()
+        self._executor = executor
+        translation.build_execution_graph(task_graph=self._tasks_graph,
+                                          workflow_context=workflow_context,
+                                          
execution_graph=self._execution_graph)
+
+    def execute(self):
+        try:
+            events.start_workflow_signal.send(self._workflow_context)
+            while True:
+                for task in self._ended_tasks():
+                    self._handle_ended_tasks(task)
+                for task in self._executable_tasks():
+                    self._handle_executable_task(task)
+                if self._all_tasks_consumed():
+                    break
+                else:
+                    time.sleep(0.1)
+            events.on_success_workflow_signal.send(self._workflow_context)
+        except BaseException as e:
+            events.on_failure_workflow_signal.send(self._workflow_context, 
exception=e)
+            raise
+
+    def _executable_tasks(self):
+        now = time.time()
+        return (task for task in self._tasks_iter()
+                if task.status == task.PENDING and
+                task.eta <= now and
+                not self._task_has_dependencies(task))
+
+    def _ended_tasks(self):
+        return (task for task in self._tasks_iter() if task.status in 
task.END_STATES)
+
+    def _task_has_dependencies(self, task):
+        return len(self._execution_graph.succ.get(task.id, {})) > 0
+
+    def _all_tasks_consumed(self):
+        len(self._execution_graph.node) == 0
+
+    def _tasks_iter(self):
+        return (data['task'] for _, data in 
self._execution_graph.nodes_iter(data=True))
+
+    def _handle_executable_task(self, task):
+        self._executor.execute(task)
+
+    def _handle_ended_tasks(self, task):
+        if task.status == task.FAILED:
+            raise RuntimeError('Workflow failed')
+        else:
+            self._execution_graph.remove_node(task.id)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/6641423f/aria/workflows/core/executor.py
----------------------------------------------------------------------
diff --git a/aria/workflows/core/executor.py b/aria/workflows/core/executor.py
new file mode 100644
index 0000000..0c3aeb1
--- /dev/null
+++ b/aria/workflows/core/executor.py
@@ -0,0 +1,169 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import threading
+import multiprocessing
+import Queue
+
+import jsonpickle
+
+from aria import events
+from aria.tools import module
+
+
+class BaseExecutor(object):
+
+    def __init__(self, *args, **kwargs):
+        pass
+
+    def execute(self, task):
+        raise NotImplementedError
+
+    def close(self):
+        pass
+
+    @staticmethod
+    def _task_started(task):
+        events.start_task_signal.send(task)
+
+    @staticmethod
+    def _task_failed(task, exception):
+        events.on_failure_task_signal.send(task, exception=exception)
+
+    @staticmethod
+    def _task_succeeded(task):
+        events.on_success_task_signal.send(task)
+
+
+class CurrentThreadBlockingExecutor(BaseExecutor):
+
+    def execute(self, task):
+        self._task_started(task)
+        try:
+            operation_context = task.context
+            task_func = 
module.load_attribute(operation_context.operation_details['operation'])
+            task_func(**operation_context.inputs)
+            self._task_succeeded(task)
+        except BaseException as e:
+            self._task_failed(task, exception=e)
+
+
+class ThreadExecutor(BaseExecutor):
+
+    def __init__(self, pool_size=1, *args, **kwargs):
+        super(ThreadExecutor, self).__init__(*args, **kwargs)
+        self._stopped = False
+        self._queue = Queue.Queue()
+        self._pool = []
+        for i in range(pool_size):
+            name = 'ThreadExecutor-{index}'.format(index=i+1)
+            thread = threading.Thread(target=self._processor, name=name)
+            thread.daemon = True
+            thread.start()
+            self._pool.append(thread)
+
+    def execute(self, task):
+        self._queue.put(task)
+
+    def close(self):
+        self._stopped = True
+        for thread in self._pool:
+            thread.join()
+
+    def _processor(self):
+        while not self._stopped:
+            try:
+                task = self._queue.get(timeout=1)
+                self._task_started(task)
+                try:
+                    operation_context = task.context
+                    task_func = module.load_attribute(
+                        operation_context.operation_details['operation'])
+                    task_func(**operation_context.inputs)
+                    self._task_succeeded(task)
+                except BaseException as e:
+                    self._task_failed(task, exception=e)
+            # Daemon threads
+            except:
+                pass
+
+
+class MultiprocessExecutor(BaseExecutor):
+
+    def __init__(self, pool_size=1, *args, **kwargs):
+        super(MultiprocessExecutor, self).__init__(*args, **kwargs)
+        self._stopped = False
+        self._manager = multiprocessing.Manager()
+        self._queue = self._manager.Queue()
+        self._tasks = {}
+        self._listener = threading.Thread(target=self._listener)
+        self._listener.daemon = True
+        self._listener.start()
+        self._pool = multiprocessing.Pool(processes=pool_size,
+                                          maxtasksperchild=1)
+
+    def execute(self, task):
+        self._tasks[task.id] = task
+        self._pool.apply_async(_multiprocess_handler, args=(
+            self._queue,
+            task.id,
+            task.context.operation_details,
+            task.context.inputs))
+
+    def close(self):
+        self._pool.close()
+        self._stopped = True
+        self._pool.join()
+        self._listener.join()
+
+    def _listener(self):
+        while not self._stopped:
+            try:
+                message = self._queue.get(timeout=1)
+                if message.type == 'task_started':
+                    self._task_started(self._tasks[message.task_id])
+                elif message.type == 'task_succeeded':
+                    self._task_succeeded(self._remove_task(message.task_id))
+                elif message.type == 'task_failed':
+                    self._task_failed(self._remove_task(message.task_id),
+                                      
exception=jsonpickle.loads(message.exception))
+                else:
+                    # TODO: something
+                    raise RuntimeError()
+            # Daemon threads
+            except:
+                pass
+
+    def _remove_task(self, task_id):
+        return self._tasks.pop(task_id)
+
+
+class _MultiprocessMessage(object):
+
+    def __init__(self, type, task_id, exception=None):
+        self.type = type
+        self.task_id = task_id
+        self.exception = exception
+
+
+def _multiprocess_handler(queue, task_id, operation_details, operation_inputs):
+    queue.put(_MultiprocessMessage(type='task_started', task_id=task_id))
+    try:
+        task_func = module.load_attribute(operation_details['operation'])
+        task_func(**operation_inputs)
+        queue.put(_MultiprocessMessage(type='task_succeeded', task_id=task_id))
+    except BaseException as e:
+        queue.put(_MultiprocessMessage(type='task_failed', task_id=task_id,
+                                       exception=jsonpickle.dumps(e)))

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/6641423f/aria/workflows/core/tasks.py
----------------------------------------------------------------------
diff --git a/aria/workflows/core/tasks.py b/aria/workflows/core/tasks.py
new file mode 100644
index 0000000..83b4263
--- /dev/null
+++ b/aria/workflows/core/tasks.py
@@ -0,0 +1,61 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+class BaseTask(object):
+
+    def __init__(self, id, name, context):
+        self.id = id
+        self.name = name
+        self.context = context
+
+
+class StartWorkflowTask(BaseTask):
+    pass
+
+
+class EndWorkflowTask(BaseTask):
+    pass
+
+
+class StartSubWorkflowTask(BaseTask):
+    pass
+
+
+class EndSubWorkflowTask(BaseTask):
+    pass
+
+
+class OperationTask(BaseTask):
+    def __init__(self, *args, **kwargs):
+        super(OperationTask, self).__init__(*args, **kwargs)
+        self._create_operation_in_storage()
+
+    def _create_operation_in_storage(self):
+        Operation = self.context.storage.operation.model_cls
+        operation = Operation(
+            id=self.context.id,
+            execution_id=self.context.execution_id,
+            max_retries=self.context.parameters.get('max_retries', 1),
+            status=Operation.PENDING,
+        )
+        self.context.operation = operation
+
+    def __getattr__(self, attr):
+        try:
+            return getattr(self.context, attr)
+        except AttributeError:
+            return super(OperationTask, self).__getattribute__(attr)
+

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/6641423f/aria/workflows/core/translation.py
----------------------------------------------------------------------
diff --git a/aria/workflows/core/translation.py 
b/aria/workflows/core/translation.py
new file mode 100644
index 0000000..71d7bcd
--- /dev/null
+++ b/aria/workflows/core/translation.py
@@ -0,0 +1,83 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from aria import contexts
+
+from . import tasks
+
+
+def build_execution_graph(
+        task_graph,
+        workflow_context,
+        execution_graph,
+        start_cls=tasks.StartWorkflowTask,
+        end_cls=tasks.EndWorkflowTask,
+        depends_on=()):
+    # Insert start marker
+    start_task = start_cls(id=_start_graph_suffix(task_graph.id),
+                           name=_start_graph_suffix(task_graph.name),
+                           context=workflow_context)
+    _add_task_and_dependencies(execution_graph, start_task, depends_on)
+
+    for operation_or_workflow, dependencies in 
task_graph.task_tree(reverse=True):
+        operation_dependencies = _get_tasks_from_dependencies(execution_graph, 
dependencies, default=[start_task])
+
+        if _is_operation(operation_or_workflow):
+            # Add the task an the dependencies
+            operation_task = tasks.OperationTask(id=operation_or_workflow.id,
+                                                 
name=operation_or_workflow.name,
+                                                 context=operation_or_workflow)
+            _add_task_and_dependencies(execution_graph, operation_task, 
operation_dependencies)
+        else:
+            # Built the graph recursively while adding start and end markers
+            build_execution_graph(
+                task_graph=operation_or_workflow,
+                workflow_context=workflow_context,
+                execution_graph=execution_graph,
+                start_cls=tasks.StartSubWorkflowTask,
+                end_cls=tasks.EndSubWorkflowTask,
+                depends_on=operation_dependencies
+            )
+
+    # Insert end marker
+    workflow_dependencies = _get_tasks_from_dependencies(execution_graph, 
task_graph.leaf_tasks, default=[start_task])
+    end_task = end_cls(id=_end_graph_suffix(task_graph.id), 
name=_end_graph_suffix(task_graph.name), context=workflow_context)
+    _add_task_and_dependencies(execution_graph, end_task, 
workflow_dependencies)
+
+
+def _add_task_and_dependencies(execution_graph, operation_task, 
operation_dependencies=()):
+    execution_graph.add_node(operation_task.id, task=operation_task)
+    for dependency in operation_dependencies:
+        execution_graph.add_edge(dependency.id, operation_task.id)
+
+
+def _get_tasks_from_dependencies(execution_graph, dependencies, default=()):
+    """
+    Returns task list from dependencies.
+    """
+    return [execution_graph.node[dependency.id if _is_operation(dependency) 
else _end_graph_suffix(dependency.id)]
+            ['task'] for dependency in dependencies] or default
+
+
+def _is_operation(task):
+    return isinstance(task, contexts.OperationContext)
+
+
+def _start_graph_suffix(id):
+    return '{0}-Start'.format(id)
+
+
+def _end_graph_suffix(id):
+    return '{0}-End'.format(id)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/6641423f/aria/workflows/engine/__init__.py
----------------------------------------------------------------------
diff --git a/aria/workflows/engine/__init__.py 
b/aria/workflows/engine/__init__.py
deleted file mode 100644
index ae1e83e..0000000
--- a/aria/workflows/engine/__init__.py
+++ /dev/null
@@ -1,14 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/6641423f/aria/workflows/engine/engine.py
----------------------------------------------------------------------
diff --git a/aria/workflows/engine/engine.py b/aria/workflows/engine/engine.py
deleted file mode 100644
index 508ae3b..0000000
--- a/aria/workflows/engine/engine.py
+++ /dev/null
@@ -1,166 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import time
-from datetime import datetime
-
-from contextlib import contextmanager
-from networkx import DiGraph
-
-from aria.events import (
-    start_workflow_signal,
-    on_success_workflow_signal,
-    on_failure_workflow_signal,
-    start_task_signal,
-    on_success_task_signal,
-    on_failure_task_signal,
-)
-from aria.logger import LoggerMixin
-
-from .translation import build_execution_graph
-
-
-from ...storage import Model
-
-
-class Engine(LoggerMixin):
-
-    def __init__(self, executor, workflow_context, tasks_graph, **kwargs):
-        super(Engine, self).__init__(**kwargs)
-        self._workflow_context = workflow_context
-        self._tasks_graph = tasks_graph
-        self._execution_graph = DiGraph()
-        self._executor = executor
-        build_execution_graph(task_graph=self._tasks_graph,
-                              workflow_context=workflow_context,
-                              execution_graph=self._execution_graph)
-
-    def execute(self):
-        execution_id = self._workflow_context.execution_id
-        with self._connect_signals():
-            try:
-                start_workflow_signal.send(self, execution_id=execution_id)
-                while True:
-                    for task in self._ended_tasks():
-                        self._handle_ended_tasks(task)
-                    for task in self._executable_tasks():
-                        self._handle_executable_task(task)
-                    if self._all_tasks_consumed():
-                        break
-                    else:
-                        time.sleep(0.1)
-                on_success_workflow_signal.send(self, 
execution_id=execution_id)
-            except BaseException as e:
-                on_failure_workflow_signal.send(self, 
execution_id=execution_id, exception=e)
-                raise
-
-    def _executable_tasks(self):
-        now = time.time()
-        return (task for task in self._tasks_iter()
-                if task.status == task.PENDING and
-                task.eta <= now and
-                not self._task_has_dependencies(task))
-
-    def _ended_tasks(self):
-        return (task for task in self._tasks_iter() if task.status in 
task.END_STATES)
-
-    def _task_has_dependencies(self, task):
-        return len(self._execution_graph.succ.get(task.id, {})) > 0
-
-    def _all_tasks_consumed(self):
-        len(self._execution_graph.node) == 0
-
-    def _tasks_iter(self):
-        return (data['task'] for _, data in 
self._execution_graph.nodes_iter(data=True))
-
-    def _get_task(self, task_id):
-        return self._execution_graph.node[task_id]['task']
-
-    def _handle_executable_task(self, task):
-        self._executor.execute(task)
-
-    def _handle_ended_tasks(self, task):
-        if task.status == task.FAILED:
-            raise RuntimeError('Workflow failed')
-        else:
-            self._execution_graph.remove_node(task.id)
-
-    def _task_started_receiver(self, task_id, *args, **kwargs):
-        task = self._get_task(task_id)
-        operation_context = task.operation_context
-        operation = operation_context.operation
-        operation.started_at = datetime.utcnow()
-        operation.status = operation.STARTED
-        operation_context.operation = operation
-
-    def _task_failed_receiver(self, task_id, *args, **kwargs):
-        task = self._get_task(task_id)
-        operation_context = task.operation_context
-        operation = operation_context.operation
-        operation.ended_at = datetime.utcnow()
-        operation.status = operation.FAILED
-        operation_context.operation = operation
-
-    def _task_succeeded_receiver(self, task_id, *args, **kwargs):
-        task = self._get_task(task_id)
-        operation_context = task.operation_context
-        operation = operation_context.operation
-        operation.ended_at = datetime.utcnow()
-        operation.status = operation.SUCCESS
-        operation_context.operation = operation
-
-    def _start_workflow_receiver(self, *args, **kwargs):
-        Execution = self._workflow_context.storage.execution.model_cls
-        execution = Execution(
-            id=self._workflow_context.execution_id,
-            deployment_id=self._workflow_context.deployment_id,
-            workflow_id=self._workflow_context.workflow_id,
-            blueprint_id=self._workflow_context.blueprint_id,
-            status=Execution.PENDING,
-            created_at=datetime.utcnow(),
-            error='',
-            parameters=self._workflow_context.parameters,
-            is_system_workflow=False
-        )
-        self._workflow_context.execution = execution
-
-    def _workflow_failed_receiver(self, exception, *args, **kwargs):
-        execution = self._workflow_context.execution
-        execution.error = str(exception)
-        execution.status = execution.FAILED
-        self._workflow_context.execution = execution
-
-    def _workflow_succeeded_receiver(self, *args, **kwargs):
-        execution = self._workflow_context.execution
-        execution.status = execution.TERMINATED
-        self._workflow_context.execution = execution
-
-    @contextmanager
-    def _connect_signals(self):
-        start_workflow_signal.connect(self._start_workflow_receiver)
-        on_success_workflow_signal.connect(self._workflow_succeeded_receiver)
-        on_failure_workflow_signal.connect(self._workflow_failed_receiver)
-        start_task_signal.connect(self._task_started_receiver)
-        on_success_task_signal.connect(self._task_succeeded_receiver)
-        on_failure_task_signal.connect(self._task_failed_receiver)
-        try:
-            yield
-        finally:
-            start_workflow_signal.disconnect(self._start_workflow_receiver)
-            
on_success_workflow_signal.disconnect(self._workflow_succeeded_receiver)
-            
on_failure_workflow_signal.disconnect(self._workflow_failed_receiver)
-            start_task_signal.disconnect(self._task_started_receiver)
-            on_success_task_signal.disconnect(self._task_succeeded_receiver)
-            on_failure_task_signal.disconnect(self._task_failed_receiver)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/6641423f/aria/workflows/engine/executor.py
----------------------------------------------------------------------
diff --git a/aria/workflows/engine/executor.py 
b/aria/workflows/engine/executor.py
deleted file mode 100644
index dacfc15..0000000
--- a/aria/workflows/engine/executor.py
+++ /dev/null
@@ -1,87 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import threading
-import Queue
-from importlib import import_module
-
-from aria.events import (
-    start_task_signal,
-    on_success_task_signal,
-    on_failure_task_signal,
-)
-
-
-class Executor(object):
-
-    def execute(self, task):
-        raise NotImplementedError
-
-    def task_started(self, task_id):
-        start_task_signal.send(self, task_id=task_id)
-
-    def task_failed(self, task_id, exception):
-        on_failure_task_signal.send(self, task_id=task_id, exception=exception)
-
-    def task_succeeded(self, task_id):
-        on_success_task_signal.send(self, task_id=task_id)
-
-
-class LocalThreadExecutor(Executor):
-
-    def __init__(self, pool_size=1):
-        self.stopped = False
-        self.queue = Queue.Queue()
-        self.pool = []
-        for i in range(pool_size):
-            name = 'LocalThreadExecutor-{index}'.format(index=i+1)
-            thread = threading.Thread(target=self._processor, name=name)
-            thread.daemon = True
-            thread.start()
-            self.pool.append(thread)
-
-    def execute(self, task):
-        self.queue.put(task)
-
-    def close(self):
-        self.stopped = True
-
-    def _processor(self):
-        while not self.stopped:
-            try:
-                task = self.queue.get(timeout=1)
-                self.task_started(task.id)
-                try:
-                    operation_context = task.operation_context
-                    task_func = 
self._load_task(operation_context.operation_details['operation'])
-                    task_func(**operation_context.inputs)
-                    self.task_succeeded(task.id)
-                except BaseException as e:
-                    self.task_failed(task.id, exception=e)
-            # Daemon threads
-            except:
-                pass
-
-    def _load_task(self, handler_path):
-        module_name, spec_handler_name = handler_path.rsplit('.', 1)
-        try:
-            module = import_module(module_name)
-            return getattr(module, spec_handler_name)
-        except ImportError:
-            # TODO: handle
-            raise
-        except AttributeError:
-            # TODO: handle
-            raise

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/6641423f/aria/workflows/engine/tasks.py
----------------------------------------------------------------------
diff --git a/aria/workflows/engine/tasks.py b/aria/workflows/engine/tasks.py
deleted file mode 100644
index 83b4263..0000000
--- a/aria/workflows/engine/tasks.py
+++ /dev/null
@@ -1,61 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-class BaseTask(object):
-
-    def __init__(self, id, name, context):
-        self.id = id
-        self.name = name
-        self.context = context
-
-
-class StartWorkflowTask(BaseTask):
-    pass
-
-
-class EndWorkflowTask(BaseTask):
-    pass
-
-
-class StartSubWorkflowTask(BaseTask):
-    pass
-
-
-class EndSubWorkflowTask(BaseTask):
-    pass
-
-
-class OperationTask(BaseTask):
-    def __init__(self, *args, **kwargs):
-        super(OperationTask, self).__init__(*args, **kwargs)
-        self._create_operation_in_storage()
-
-    def _create_operation_in_storage(self):
-        Operation = self.context.storage.operation.model_cls
-        operation = Operation(
-            id=self.context.id,
-            execution_id=self.context.execution_id,
-            max_retries=self.context.parameters.get('max_retries', 1),
-            status=Operation.PENDING,
-        )
-        self.context.operation = operation
-
-    def __getattr__(self, attr):
-        try:
-            return getattr(self.context, attr)
-        except AttributeError:
-            return super(OperationTask, self).__getattribute__(attr)
-

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/6641423f/aria/workflows/engine/translation.py
----------------------------------------------------------------------
diff --git a/aria/workflows/engine/translation.py 
b/aria/workflows/engine/translation.py
deleted file mode 100644
index 71d7bcd..0000000
--- a/aria/workflows/engine/translation.py
+++ /dev/null
@@ -1,83 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from aria import contexts
-
-from . import tasks
-
-
-def build_execution_graph(
-        task_graph,
-        workflow_context,
-        execution_graph,
-        start_cls=tasks.StartWorkflowTask,
-        end_cls=tasks.EndWorkflowTask,
-        depends_on=()):
-    # Insert start marker
-    start_task = start_cls(id=_start_graph_suffix(task_graph.id),
-                           name=_start_graph_suffix(task_graph.name),
-                           context=workflow_context)
-    _add_task_and_dependencies(execution_graph, start_task, depends_on)
-
-    for operation_or_workflow, dependencies in 
task_graph.task_tree(reverse=True):
-        operation_dependencies = _get_tasks_from_dependencies(execution_graph, 
dependencies, default=[start_task])
-
-        if _is_operation(operation_or_workflow):
-            # Add the task an the dependencies
-            operation_task = tasks.OperationTask(id=operation_or_workflow.id,
-                                                 
name=operation_or_workflow.name,
-                                                 context=operation_or_workflow)
-            _add_task_and_dependencies(execution_graph, operation_task, 
operation_dependencies)
-        else:
-            # Built the graph recursively while adding start and end markers
-            build_execution_graph(
-                task_graph=operation_or_workflow,
-                workflow_context=workflow_context,
-                execution_graph=execution_graph,
-                start_cls=tasks.StartSubWorkflowTask,
-                end_cls=tasks.EndSubWorkflowTask,
-                depends_on=operation_dependencies
-            )
-
-    # Insert end marker
-    workflow_dependencies = _get_tasks_from_dependencies(execution_graph, 
task_graph.leaf_tasks, default=[start_task])
-    end_task = end_cls(id=_end_graph_suffix(task_graph.id), 
name=_end_graph_suffix(task_graph.name), context=workflow_context)
-    _add_task_and_dependencies(execution_graph, end_task, 
workflow_dependencies)
-
-
-def _add_task_and_dependencies(execution_graph, operation_task, 
operation_dependencies=()):
-    execution_graph.add_node(operation_task.id, task=operation_task)
-    for dependency in operation_dependencies:
-        execution_graph.add_edge(dependency.id, operation_task.id)
-
-
-def _get_tasks_from_dependencies(execution_graph, dependencies, default=()):
-    """
-    Returns task list from dependencies.
-    """
-    return [execution_graph.node[dependency.id if _is_operation(dependency) 
else _end_graph_suffix(dependency.id)]
-            ['task'] for dependency in dependencies] or default
-
-
-def _is_operation(task):
-    return isinstance(task, contexts.OperationContext)
-
-
-def _start_graph_suffix(id):
-    return '{0}-Start'.format(id)
-
-
-def _end_graph_suffix(id):
-    return '{0}-End'.format(id)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/6641423f/tests/workflows/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/workflows/test_executor.py b/tests/workflows/test_executor.py
new file mode 100644
index 0000000..16bb900
--- /dev/null
+++ b/tests/workflows/test_executor.py
@@ -0,0 +1,118 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import uuid
+
+import pytest
+import retrying
+
+from aria import events
+from aria.storage import models
+from aria.workflows.core import executor
+
+
+class TestExecutor(object):
+
+    @pytest.mark.parametrize('pool_size,executor_cls', [
+        (1, executor.ThreadExecutor),
+        (2, executor.ThreadExecutor),
+        (1, executor.MultiprocessExecutor),
+        (2, executor.MultiprocessExecutor),
+        (0, executor.CurrentThreadBlockingExecutor)
+    ])
+    def test_execute(self, pool_size, executor_cls):
+        self.executor = executor_cls(pool_size)
+        expected_value = 'value'
+        successful_task = MockTask(mock_successful_task)
+        failing_task = MockTask(mock_failing_task)
+        task_with_inputs = MockTask(mock_task_with_input, inputs={'input': 
expected_value})
+
+        for task in [successful_task, failing_task, task_with_inputs]:
+            self.executor.execute(task)
+
+        @retrying.retry(stop_max_delay=10000, wait_fixed=100)
+        def assertion():
+            assert successful_task.states == ['start', 'success']
+            assert failing_task.states == ['start', 'failure']
+            assert task_with_inputs.states == ['start', 'failure']
+            assert isinstance(failing_task.exception, TestException)
+            assert isinstance(task_with_inputs.exception, TestException)
+            assert task_with_inputs.exception.message == expected_value
+        assertion()
+
+    def setup_method(self):
+        self.executor = None
+        events.start_task_signal.connect(start_handler)
+        events.on_success_task_signal.connect(success_handler)
+        events.on_failure_task_signal.connect(failure_handler)
+
+    def teardown_method(self):
+        events.start_task_signal.disconnect(start_handler)
+        events.on_success_task_signal.disconnect(success_handler)
+        events.on_failure_task_signal.disconnect(failure_handler)
+        if self.executor:
+            self.executor.close()
+
+
+def mock_successful_task():
+    pass
+
+
+def mock_failing_task():
+    raise TestException
+
+
+def mock_task_with_input(input):
+    raise TestException(input)
+
+
+class TestException(Exception):
+    pass
+
+
+class MockContext(object):
+
+    def __init__(self, operation_details, inputs):
+        self.operation_details = operation_details
+        self.inputs = inputs
+        self.operation = models.Operation(execution_id='')
+
+
+class MockTask(object):
+
+    def __init__(self, func, inputs=None):
+        self.states = []
+        self.exception = None
+        self.id = str(uuid.uuid4())
+        name = func.__name__
+        operation = 'tests.workflows.test_executor.{name}'.format(name=name)
+        self.context = MockContext(operation_details={'operation': operation},
+                                   inputs=inputs or {})
+        self.logger = logging.getLogger()
+        self.name = name
+
+
+def start_handler(task, *args, **kwargs):
+    task.states.append('start')
+
+
+def success_handler(task, *args, **kwargs):
+    task.states.append('success')
+
+
+def failure_handler(task, exception, *args, **kwargs):
+    task.states.append('failure')
+    task.exception = exception

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/6641423f/tests/workflows/test_task_graph_into_exececution_graph.py
----------------------------------------------------------------------
diff --git a/tests/workflows/test_task_graph_into_exececution_graph.py 
b/tests/workflows/test_task_graph_into_exececution_graph.py
index 28f31a0..125097e 100644
--- a/tests/workflows/test_task_graph_into_exececution_graph.py
+++ b/tests/workflows/test_task_graph_into_exececution_graph.py
@@ -18,7 +18,7 @@ from networkx import topological_sort, DiGraph
 
 from aria import contexts
 from aria.workflows.api import tasks_graph
-from aria.workflows.engine import tasks, translation
+from aria.workflows.core import tasks, translation
 
 
 @pytest.fixture(autouse=True)

Reply via email to