Repository: incubator-ariatosca Updated Branches: refs/heads/wf-wip c52f949e9 -> b53aa1f3b
implement basic local thread executor Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/b53aa1f3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/b53aa1f3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/b53aa1f3 Branch: refs/heads/wf-wip Commit: b53aa1f3b7c5f3685c24c367143a5c47c1957526 Parents: c52f949 Author: Dan Kilman <[email protected]> Authored: Thu Oct 13 15:44:58 2016 +0300 Committer: Dan Kilman <[email protected]> Committed: Thu Oct 13 15:44:58 2016 +0300 ---------------------------------------------------------------------- aria/cli/commands.py | 6 ++-- aria/workflows/engine/executor.py | 50 ++++++++++++++++++++++++++++++++-- 2 files changed, 52 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b53aa1f3/aria/cli/commands.py ---------------------------------------------------------------------- diff --git a/aria/cli/commands.py b/aria/cli/commands.py index 465569f..9fa4911 100644 --- a/aria/cli/commands.py +++ b/aria/cli/commands.py @@ -27,7 +27,7 @@ from aria.storage import FileSystemModelDriver, FileSystemResourceDriver from aria.tools.application import StorageManager from aria.contexts import WorkflowContext from aria.workflows.engine.engine import Engine -from aria.workflows.engine.executor import LocalExecutor +from aria.workflows.engine.executor import LocalThreadExecutor from .storage import ( local_resource_storage, @@ -225,10 +225,12 @@ class ExecuteCommand(BaseCommand): ) workflow_function = self._load_workflow_handler(workflow['operation']) tasks_graph = workflow_function(workflow_context, **workflow_context.parameters) - workflow_engine = Engine(executor=LocalExecutor(), + executor = LocalThreadExecutor() + workflow_engine = Engine(executor=executor, workflow_context=workflow_context, tasks_graph=tasks_graph) workflow_engine.execute() + executor.close() def _merge_and_validate_execution_parameters( self, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b53aa1f3/aria/workflows/engine/executor.py ---------------------------------------------------------------------- diff --git a/aria/workflows/engine/executor.py b/aria/workflows/engine/executor.py index 1b9f276..dacfc15 100644 --- a/aria/workflows/engine/executor.py +++ b/aria/workflows/engine/executor.py @@ -13,6 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +import threading +import Queue +from importlib import import_module + from aria.events import ( start_task_signal, on_success_task_signal, @@ -35,7 +39,49 @@ class Executor(object): on_success_task_signal.send(self, task_id=task_id) -class LocalExecutor(Executor): +class LocalThreadExecutor(Executor): + + def __init__(self, pool_size=1): + self.stopped = False + self.queue = Queue.Queue() + self.pool = [] + for i in range(pool_size): + name = 'LocalThreadExecutor-{index}'.format(index=i+1) + thread = threading.Thread(target=self._processor, name=name) + thread.daemon = True + thread.start() + self.pool.append(thread) def execute(self, task): - pass + self.queue.put(task) + + def close(self): + self.stopped = True + + def _processor(self): + while not self.stopped: + try: + task = self.queue.get(timeout=1) + self.task_started(task.id) + try: + operation_context = task.operation_context + task_func = self._load_task(operation_context.operation_details['operation']) + task_func(**operation_context.inputs) + self.task_succeeded(task.id) + except BaseException as e: + self.task_failed(task.id, exception=e) + # Daemon threads + except: + pass + + def _load_task(self, handler_path): + module_name, spec_handler_name = handler_path.rsplit('.', 1) + try: + module = import_module(module_name) + return getattr(module, spec_handler_name) + except ImportError: + # TODO: handle + raise + except AttributeError: + # TODO: handle + raise
