Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-26-plugin-mechanism ad9cab78d -> b9e292d83 (forced update)
ARIA-26 TBD Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/b9e292d8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/b9e292d8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/b9e292d8 Branch: refs/heads/ARIA-26-plugin-mechanism Commit: b9e292d8338effc80cba75379d2df1f8c426fafc Parents: b33c70e Author: Dan Kilman <[email protected]> Authored: Sun Nov 27 16:31:29 2016 +0200 Committer: Dan Kilman <[email protected]> Committed: Mon Nov 28 17:31:22 2016 +0200 ---------------------------------------------------------------------- aria/cli/commands.py | 4 +- .../orchestrator/workflows/executor/__init__.py | 2 +- .../orchestrator/workflows/executor/blocking.py | 36 ------- .../workflows/executor/multiprocess.py | 61 ++++++----- aria/orchestrator/workflows/executor/thread.py | 3 +- aria/storage/models.py | 2 +- aria/utils/plugin.py | 21 ++++ requirements.txt | 1 + tests/orchestrator/context/test_toolbelt.py | 1 + .../orchestrator/workflows/core/test_engine.py | 8 +- .../workflows/executor/test_executor.py | 103 +++++++++---------- 11 files changed, 113 insertions(+), 129 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b9e292d8/aria/cli/commands.py ---------------------------------------------------------------------- diff --git a/aria/cli/commands.py b/aria/cli/commands.py index 57118a7..ab035ba 100644 --- a/aria/cli/commands.py +++ b/aria/cli/commands.py @@ -31,7 +31,7 @@ from ..logger import LoggerMixin from ..storage import (FileSystemModelDriver, FileSystemResourceDriver) from ..orchestrator.context.workflow import WorkflowContext from ..orchestrator.workflows.core.engine import Engine -from ..orchestrator.workflows.executor.thread import ThreadExecutor +from ..orchestrator.workflows.executor.multiprocess import MultiprocessExecutor from ..parser import (DSL_SPECIFICATION_PACKAGES, iter_specifications) from ..parser.consumption import ( ConsumptionContext, @@ -248,7 +248,7 @@ class ExecuteCommand(BaseCommand): ) workflow_function = self._load_workflow_handler(workflow['operation']) tasks_graph = workflow_function(workflow_context, **workflow_context.parameters) - executor = ThreadExecutor() + executor = MultiprocessExecutor() workflow_engine = Engine(executor=executor, workflow_context=workflow_context, tasks_graph=tasks_graph) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b9e292d8/aria/orchestrator/workflows/executor/__init__.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/__init__.py b/aria/orchestrator/workflows/executor/__init__.py index 16b6c9b..7b205c1 100644 --- a/aria/orchestrator/workflows/executor/__init__.py +++ b/aria/orchestrator/workflows/executor/__init__.py @@ -18,5 +18,5 @@ Executors for task execution """ -from . import blocking, multiprocess, thread +from . import multiprocess, thread from .base import BaseExecutor http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b9e292d8/aria/orchestrator/workflows/executor/blocking.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/blocking.py b/aria/orchestrator/workflows/executor/blocking.py deleted file mode 100644 index 9d3a9ba..0000000 --- a/aria/orchestrator/workflows/executor/blocking.py +++ /dev/null @@ -1,36 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Blocking executor -""" - -from aria.utils import imports -from .base import BaseExecutor - - -class CurrentThreadBlockingExecutor(BaseExecutor): - """ - Executor which runs tasks in the current thread (blocking) - """ - - def execute(self, task): - self._task_started(task) - try: - task_func = imports.load_attribute(task.operation_mapping) - task_func(ctx=task.context, **task.inputs) - self._task_succeeded(task) - except BaseException as e: - self._task_failed(task, exception=e) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b9e292d8/aria/orchestrator/workflows/executor/multiprocess.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/multiprocess.py b/aria/orchestrator/workflows/executor/multiprocess.py index d770e07..0537381 100644 --- a/aria/orchestrator/workflows/executor/multiprocess.py +++ b/aria/orchestrator/workflows/executor/multiprocess.py @@ -17,6 +17,7 @@ Multiprocess based executor """ +import collections import multiprocessing import threading @@ -26,6 +27,11 @@ from aria.utils import imports from .base import BaseExecutor +_TaskStarted = collections.namedtuple('_TaskStarted', 'task_id') +_TaskSucceeded = collections.namedtuple('_TaskSucceeded', 'task_id') +_TaskFailed = collections.namedtuple('_TaskFailed', 'task_id exception') + + class MultiprocessExecutor(BaseExecutor): """ Executor which runs tasks in a multiprocess environment @@ -40,59 +46,52 @@ class MultiprocessExecutor(BaseExecutor): self._listener_thread = threading.Thread(target=self._listener) self._listener_thread.daemon = True self._listener_thread.start() - self._pool = multiprocessing.Pool(processes=pool_size) - - def execute(self, task): - self._tasks[task.id] = task - self._pool.apply_async(_multiprocess_handler, args=( - self._queue, - task.context, - task.id, - task.operation_mapping, - task.inputs)) + self._pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=1) def close(self): self._pool.close() self._stopped = True self._pool.join() + self._manager.shutdown() + self._manager.join() self._listener_thread.join() + def execute(self, task): + self._tasks[task.id] = task + self._pool.apply_async(_handler, kwds={ + 'queue': self._queue, + 'ctx': task.context, + 'task_id': task.id, + 'operation_mapping': task.operation_mapping, + 'operation_inputs': task.inputs + }) + + def _remove_task(self, task_id): + return self._tasks.pop(task_id) + def _listener(self): while not self._stopped: try: message = self._queue.get(timeout=1) - if message.type == 'task_started': + if isinstance(message, _TaskStarted): self._task_started(self._tasks[message.task_id]) - elif message.type == 'task_succeeded': + elif isinstance(message, _TaskSucceeded): self._task_succeeded(self._remove_task(message.task_id)) - elif message.type == 'task_failed': + elif isinstance(message, _TaskFailed): self._task_failed(self._remove_task(message.task_id), exception=jsonpickle.loads(message.exception)) else: - # TODO: something - raise RuntimeError() + raise RuntimeError('Invalid state') # Daemon threads except BaseException: pass - def _remove_task(self, task_id): - return self._tasks.pop(task_id) - - -class _MultiprocessMessage(object): - - def __init__(self, type, task_id, exception=None): - self.type = type - self.task_id = task_id - self.exception = exception - -def _multiprocess_handler(queue, ctx, task_id, operation_mapping, operation_inputs): - queue.put(_MultiprocessMessage(type='task_started', task_id=task_id)) +def _handler(queue, ctx, task_id, operation_mapping, operation_inputs): + queue.put(_TaskStarted(task_id)) try: task_func = imports.load_attribute(operation_mapping) task_func(ctx=ctx, **operation_inputs) - queue.put(_MultiprocessMessage(type='task_succeeded', task_id=task_id)) + queue.put(_TaskSucceeded(task_id)) except BaseException as e: - queue.put(_MultiprocessMessage(type='task_failed', task_id=task_id, - exception=jsonpickle.dumps(e))) + queue.put(_TaskFailed(task_id, exception=jsonpickle.dumps(e))) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b9e292d8/aria/orchestrator/workflows/executor/thread.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py index 76ceefd..16a58fb 100644 --- a/aria/orchestrator/workflows/executor/thread.py +++ b/aria/orchestrator/workflows/executor/thread.py @@ -26,7 +26,8 @@ from .base import BaseExecutor class ThreadExecutor(BaseExecutor): """ - Executor which runs tasks in a separate thread + Executor which runs tasks in a separate thread. It's easier writing tests + using this executor rather than the full blown multiprocessing executor. """ def __init__(self, pool_size=1, *args, **kwargs): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b9e292d8/aria/storage/models.py ---------------------------------------------------------------------- diff --git a/aria/storage/models.py b/aria/storage/models.py index d24ad75..e587b94 100644 --- a/aria/storage/models.py +++ b/aria/storage/models.py @@ -441,4 +441,4 @@ class Task(Model): name = Field(type=basestring) operation_mapping = Field(type=basestring) actor = Field() - inputs = Field(type=dict, default=lambda: {}) + inputs = Field(type=dict, default=dict) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b9e292d8/aria/utils/plugin.py ---------------------------------------------------------------------- diff --git a/aria/utils/plugin.py b/aria/utils/plugin.py index bb2b974..9868c89 100644 --- a/aria/utils/plugin.py +++ b/aria/utils/plugin.py @@ -19,8 +19,13 @@ Contains utility methods that enable dynamic python code loading """ import os +import tempfile +import subprocess +import sys from importlib import import_module +import wagon + def plugin_installer(path, plugin_suffix, package=None, callback=None): """ @@ -37,3 +42,19 @@ def plugin_installer(path, plugin_suffix, package=None, callback=None): module = import_module(module_name) if callback: callback(module) + + +def create(source, destination_dir): + return wagon.create(source=source, archive_destination_dir=destination_dir) + + +def install(source, prefix): + with tempfile.NamedTemporaryFile() as constraint: + constraint.write(subprocess.check_output([sys.executable, '-m', 'pip', 'freeze'])) + constraint.flush() + wagon.install( + source=source, + install_args='--prefix="{prefix}" --constraint="{constraint}"'.format( + prefix=prefix, + constraint=constraint.name) + ) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b9e292d8/requirements.txt ---------------------------------------------------------------------- diff --git a/requirements.txt b/requirements.txt index e6d5393..770d416 100644 --- a/requirements.txt +++ b/requirements.txt @@ -23,3 +23,4 @@ Jinja2==2.8 shortuuid==0.4.3 CacheControl[filecache]==0.11.6 clint==0.5.1 +wagon==0.5.0 http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b9e292d8/tests/orchestrator/context/test_toolbelt.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_toolbelt.py b/tests/orchestrator/context/test_toolbelt.py index 547e62b..2a6c349 100644 --- a/tests/orchestrator/context/test_toolbelt.py +++ b/tests/orchestrator/context/test_toolbelt.py @@ -152,6 +152,7 @@ def test_wrong_model_toolbelt(): with pytest.raises(RuntimeError): context.toolbelt(None) + @operation(toolbelt=True) def host_ip(toolbelt, **_): global_test_holder['host_ip'] = toolbelt.host_ip http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b9e292d8/tests/orchestrator/workflows/core/test_engine.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py index 1b00bf6..99b31c6 100644 --- a/tests/orchestrator/workflows/core/test_engine.py +++ b/tests/orchestrator/workflows/core/test_engine.py @@ -79,14 +79,14 @@ class BaseTest(object): ignore_failure=ignore_failure ) - @pytest.fixture(scope='function', autouse=True) + @pytest.fixture(autouse=True) def globals_cleanup(self): try: yield finally: global_test_holder.clear() - @pytest.fixture(scope='function', autouse=True) + @pytest.fixture(autouse=True) def signals_registration(self, ): def sent_task_handler(*args, **kwargs): calls = global_test_holder.setdefault('sent_task_signal_calls', 0) @@ -119,7 +119,7 @@ class BaseTest(object): events.on_cancelled_workflow_signal.disconnect(cancel_workflow_handler) events.sent_task_signal.disconnect(sent_task_handler) - @pytest.fixture(scope='function') + @pytest.fixture def executor(self): result = thread.ThreadExecutor() try: @@ -127,7 +127,7 @@ class BaseTest(object): finally: result.close() - @pytest.fixture(scope='function') + @pytest.fixture def workflow_context(self): model_storage = aria.application_model_storage(tests.storage.InMemoryModelDriver()) model_storage.setup() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b9e292d8/tests/orchestrator/workflows/executor/test_executor.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_executor.py b/tests/orchestrator/workflows/executor/test_executor.py index a425799..fc81ecc 100644 --- a/tests/orchestrator/workflows/executor/test_executor.py +++ b/tests/orchestrator/workflows/executor/test_executor.py @@ -25,7 +25,6 @@ from aria.orchestrator import events from aria.orchestrator.workflows.executor import ( thread, multiprocess, - blocking, # celery ) @@ -38,47 +37,24 @@ except ImportError: app = None -class TestExecutor(object): - - @pytest.mark.parametrize('executor_cls,executor_kwargs', [ - (thread.ThreadExecutor, {'pool_size': 1}), - (thread.ThreadExecutor, {'pool_size': 2}), - (multiprocess.MultiprocessExecutor, {'pool_size': 1}), - (multiprocess.MultiprocessExecutor, {'pool_size': 2}), - (blocking.CurrentThreadBlockingExecutor, {}), - # (celery.CeleryExecutor, {'app': app}) - ]) - def test_execute(self, executor_cls, executor_kwargs): - self.executor = executor_cls(**executor_kwargs) - expected_value = 'value' - successful_task = MockTask(mock_successful_task) - failing_task = MockTask(mock_failing_task) - task_with_inputs = MockTask(mock_task_with_input, inputs={'input': expected_value}) - - for task in [successful_task, failing_task, task_with_inputs]: - self.executor.execute(task) - - @retrying.retry(stop_max_delay=10000, wait_fixed=100) - def assertion(): - assert successful_task.states == ['start', 'success'] - assert failing_task.states == ['start', 'failure'] - assert task_with_inputs.states == ['start', 'failure'] - assert isinstance(failing_task.exception, MockException) - assert isinstance(task_with_inputs.exception, MockException) - assert task_with_inputs.exception.message == expected_value - assertion() - - def setup_method(self): - events.start_task_signal.connect(start_handler) - events.on_success_task_signal.connect(success_handler) - events.on_failure_task_signal.connect(failure_handler) - - def teardown_method(self): - events.start_task_signal.disconnect(start_handler) - events.on_success_task_signal.disconnect(success_handler) - events.on_failure_task_signal.disconnect(failure_handler) - if hasattr(self, 'executor'): - self.executor.close() +def test_execute(executor): + expected_value = 'value' + successful_task = MockTask(mock_successful_task) + failing_task = MockTask(mock_failing_task) + task_with_inputs = MockTask(mock_task_with_input, inputs={'input': expected_value}) + + for task in [successful_task, failing_task, task_with_inputs]: + executor.execute(task) + + @retrying.retry(stop_max_delay=10000, wait_fixed=100) + def assertion(): + assert successful_task.states == ['start', 'success'] + assert failing_task.states == ['start', 'failure'] + assert task_with_inputs.states == ['start', 'failure'] + assert isinstance(failing_task.exception, MockException) + assert isinstance(task_with_inputs.exception, MockException) + assert task_with_inputs.exception.message == expected_value + assertion() def mock_successful_task(**_): @@ -128,14 +104,35 @@ class MockTask(object): yield self -def start_handler(task, *args, **kwargs): - task.states.append('start') - - -def success_handler(task, *args, **kwargs): - task.states.append('success') - - -def failure_handler(task, exception, *args, **kwargs): - task.states.append('failure') - task.exception = exception [email protected](params=[ + (thread.ThreadExecutor, {'pool_size': 1}), + (thread.ThreadExecutor, {'pool_size': 2}), + (multiprocess.MultiprocessExecutor, {'pool_size': 1}), + (multiprocess.MultiprocessExecutor, {'pool_size': 2}), + # (celery.CeleryExecutor, {'app': app}) +]) +def executor(request): + executor_cls, executor_kwargs = request.param + result = executor_cls(**executor_kwargs) + yield result + result.close() + + [email protected](autouse=True) +def register_signals(): + def start_handler(task, *args, **kwargs): + task.states.append('start') + + def success_handler(task, *args, **kwargs): + task.states.append('success') + + def failure_handler(task, exception, *args, **kwargs): + task.states.append('failure') + task.exception = exception + events.start_task_signal.connect(start_handler) + events.on_success_task_signal.connect(success_handler) + events.on_failure_task_signal.connect(failure_handler) + yield + events.start_task_signal.disconnect(start_handler) + events.on_success_task_signal.disconnect(success_handler) + events.on_failure_task_signal.disconnect(failure_handler)
