Repository: incubator-ariatosca
Updated Branches:
  refs/heads/wf-wip c52f949e9 -> b53aa1f3b


implement basic local thread executor


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/b53aa1f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/b53aa1f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/b53aa1f3

Branch: refs/heads/wf-wip
Commit: b53aa1f3b7c5f3685c24c367143a5c47c1957526
Parents: c52f949
Author: Dan Kilman <dankil...@gmail.com>
Authored: Thu Oct 13 15:44:58 2016 +0300
Committer: Dan Kilman <dankil...@gmail.com>
Committed: Thu Oct 13 15:44:58 2016 +0300

----------------------------------------------------------------------
 aria/cli/commands.py              |  6 ++--
 aria/workflows/engine/executor.py | 50 ++++++++++++++++++++++++++++++++--
 2 files changed, 52 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b53aa1f3/aria/cli/commands.py
----------------------------------------------------------------------
diff --git a/aria/cli/commands.py b/aria/cli/commands.py
index 465569f..9fa4911 100644
--- a/aria/cli/commands.py
+++ b/aria/cli/commands.py
@@ -27,7 +27,7 @@ from aria.storage import FileSystemModelDriver, 
FileSystemResourceDriver
 from aria.tools.application import StorageManager
 from aria.contexts import WorkflowContext
 from aria.workflows.engine.engine import Engine
-from aria.workflows.engine.executor import LocalExecutor
+from aria.workflows.engine.executor import LocalThreadExecutor
 
 from .storage import (
     local_resource_storage,
@@ -225,10 +225,12 @@ class ExecuteCommand(BaseCommand):
         )
         workflow_function = self._load_workflow_handler(workflow['operation'])
         tasks_graph = workflow_function(workflow_context, 
**workflow_context.parameters)
-        workflow_engine = Engine(executor=LocalExecutor(),
+        executor = LocalThreadExecutor()
+        workflow_engine = Engine(executor=executor,
                                  workflow_context=workflow_context,
                                  tasks_graph=tasks_graph)
         workflow_engine.execute()
+        executor.close()
 
     def _merge_and_validate_execution_parameters(
             self,

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b53aa1f3/aria/workflows/engine/executor.py
----------------------------------------------------------------------
diff --git a/aria/workflows/engine/executor.py 
b/aria/workflows/engine/executor.py
index 1b9f276..dacfc15 100644
--- a/aria/workflows/engine/executor.py
+++ b/aria/workflows/engine/executor.py
@@ -13,6 +13,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import threading
+import Queue
+from importlib import import_module
+
 from aria.events import (
     start_task_signal,
     on_success_task_signal,
@@ -35,7 +39,49 @@ class Executor(object):
         on_success_task_signal.send(self, task_id=task_id)
 
 
-class LocalExecutor(Executor):
+class LocalThreadExecutor(Executor):
+
+    def __init__(self, pool_size=1):
+        self.stopped = False
+        self.queue = Queue.Queue()
+        self.pool = []
+        for i in range(pool_size):
+            name = 'LocalThreadExecutor-{index}'.format(index=i+1)
+            thread = threading.Thread(target=self._processor, name=name)
+            thread.daemon = True
+            thread.start()
+            self.pool.append(thread)
 
     def execute(self, task):
-        pass
+        self.queue.put(task)
+
+    def close(self):
+        self.stopped = True
+
+    def _processor(self):
+        while not self.stopped:
+            try:
+                task = self.queue.get(timeout=1)
+                self.task_started(task.id)
+                try:
+                    operation_context = task.operation_context
+                    task_func = 
self._load_task(operation_context.operation_details['operation'])
+                    task_func(**operation_context.inputs)
+                    self.task_succeeded(task.id)
+                except BaseException as e:
+                    self.task_failed(task.id, exception=e)
+            # Daemon threads
+            except:
+                pass
+
+    def _load_task(self, handler_path):
+        module_name, spec_handler_name = handler_path.rsplit('.', 1)
+        try:
+            module = import_module(module_name)
+            return getattr(module, spec_handler_name)
+        except ImportError:
+            # TODO: handle
+            raise
+        except AttributeError:
+            # TODO: handle
+            raise

Reply via email to