Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-26-plugin-mechanism 9d626da95 -> 5cf84eebe (forced update)
ARIA-26 Implement operation plugin mechanism Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/5cf84eeb Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/5cf84eeb Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/5cf84eeb Branch: refs/heads/ARIA-26-plugin-mechanism Commit: 5cf84eebe425bbade0d9285081cebf5d0a62a675 Parents: 04c9bd0 Author: Dan Kilman <d...@gigaspaces.com> Authored: Sun Nov 27 16:31:29 2016 +0200 Committer: Dan Kilman <d...@gigaspaces.com> Committed: Mon Dec 19 15:47:38 2016 +0200 ---------------------------------------------------------------------- aria/cli/commands.py | 4 +- aria/orchestrator/exceptions.py | 7 + aria/orchestrator/plugin.py | 105 +++++++ aria/orchestrator/workflows/api/task.py | 44 ++- aria/orchestrator/workflows/core/task.py | 12 +- .../orchestrator/workflows/executor/__init__.py | 2 +- aria/orchestrator/workflows/executor/base.py | 6 +- .../orchestrator/workflows/executor/blocking.py | 36 --- .../workflows/executor/multiprocess.py | 98 ------- aria/orchestrator/workflows/executor/process.py | 277 +++++++++++++++++++ aria/orchestrator/workflows/executor/thread.py | 4 +- aria/storage/models.py | 15 +- aria/utils/plugin.py | 20 ++ requirements.txt | 3 +- tests/mock/models.py | 16 ++ tests/orchestrator/workflows/api/test_task.py | 51 +++- tests/orchestrator/workflows/core/test_task.py | 16 +- .../workflows/executor/test_executor.py | 111 ++++---- .../workflows/executor/test_process_executor.py | 130 +++++++++ tests/resources/__init__.py | 19 ++ .../plugins/mock-plugin1/mock_plugin1.py | 27 ++ tests/resources/plugins/mock-plugin1/setup.py | 28 ++ tests/storage/test_models.py | 188 ++++++------- tests/utils/__init__.py | 14 + tests/utils/test_plugin.py | 77 ++++++ 25 files changed, 990 insertions(+), 320 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/aria/cli/commands.py ---------------------------------------------------------------------- diff --git a/aria/cli/commands.py b/aria/cli/commands.py index 141da07..1cd765f 100644 --- a/aria/cli/commands.py +++ b/aria/cli/commands.py @@ -34,7 +34,7 @@ from ..logger import LoggerMixin from ..storage import (FileSystemModelDriver, FileSystemResourceDriver) from ..orchestrator.context.workflow import WorkflowContext from ..orchestrator.workflows.core.engine import Engine -from ..orchestrator.workflows.executor.thread import ThreadExecutor +from ..orchestrator.workflows.executor.process import ProcessExecutor from ..parser import iter_specifications from ..parser.consumption import ( ConsumptionContext, @@ -252,7 +252,7 @@ class ExecuteCommand(BaseCommand): ) workflow_function = self._load_workflow_handler(workflow['operation']) tasks_graph = workflow_function(workflow_context, **workflow_context.parameters) - executor = ThreadExecutor() + executor = ProcessExecutor() workflow_engine = Engine(executor=executor, workflow_context=workflow_context, tasks_graph=tasks_graph) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/aria/orchestrator/exceptions.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/exceptions.py b/aria/orchestrator/exceptions.py index 1a48194..74e9002 100644 --- a/aria/orchestrator/exceptions.py +++ b/aria/orchestrator/exceptions.py @@ -23,3 +23,10 @@ class OrchestratorError(AriaError): Orchestrator based exception """ pass + + +class PluginAlreadyExistsError(AriaError): + """ + Raised when a plugin with the same package name and package version already exists + """ + pass http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/aria/orchestrator/plugin.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/plugin.py b/aria/orchestrator/plugin.py new file mode 100644 index 0000000..3005756 --- /dev/null +++ b/aria/orchestrator/plugin.py @@ -0,0 +1,105 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import tempfile +import subprocess +import sys +from datetime import datetime + +import wagon + +from . import exceptions + + +class PluginManager(object): + + def __init__(self, model, plugins_dir): + """ + :param plugins_dir: Root directory to install plugins in. + """ + self._model = model + self._plugins_dir = plugins_dir + + def install(self, source): + """ + Install a wagon plugin. + """ + metadata = wagon.show(source) + cls = self._model.plugin.model_cls + plugin = cls( + archive_name=metadata['archive_name'], + supported_platform=metadata['supported_platform'], + supported_py_versions=metadata['supported_python_versions'], + # Remove suffix colon after upgrading wagon to > 0.5.0 + distribution=metadata['build_server_os_properties']['distribution:'], + distribution_release=metadata['build_server_os_properties']['distribution_version'], + distribution_version=metadata['build_server_os_properties']['distribution_release'], + package_name=metadata['package_name'], + package_version=metadata['package_version'], + package_source=metadata['package_source'], + wheels=metadata['wheels'], + uploaded_at=datetime.now() + ) + if len(self._model.plugin.list(filters={'package_name': plugin.package_name, + 'package_version': plugin.package_version})): + raise exceptions.PluginAlreadyExistsError( + 'Plugin {0}, version {1} already exists'.format(plugin.package_name, + plugin.package_version)) + self._install_wagon(source=source, prefix=self.get_plugin_prefix(plugin)) + self._model.plugin.put(plugin) + return plugin + + def get_plugin_prefix(self, plugin): + return os.path.join( + self._plugins_dir, + '{0}-{1}'.format(plugin.package_name, plugin.package_version)) + + def _install_wagon(self, source, prefix): + pip_freeze_output = self._pip_freeze() + file_descriptor, constraint_path = tempfile.mkstemp(prefix='constraint-', suffix='.txt') + os.close(file_descriptor) + try: + with open(constraint_path, 'wb') as constraint: + constraint.write(pip_freeze_output) + # Install the provided wagon. + # * The --prefix install_arg will cause the plugin to be installed under + # plugins_dir/{package_name}-{package_version}, So different plugins don't step on + # each other and don't interfere with the current virtualenv + # * The --constraint flag points a file containing the output of ``pip freeze``. + # It is required, to handle cases where plugins depend on some python package with + # a different version than the one installed in the current virtualenv. Without this + # flag, the existing package will be **removed** from the parent virtualenv and the + # new package will be installed under prefix. With the flag, the existing version will + # remain, and the version requested by the plugin will be ignored. + wagon.install( + source=source, + install_args='--prefix="{prefix}" --constraint="{constraint}"'.format( + prefix=prefix, + constraint=constraint.name), + virtualenv=os.environ.get('VIRTUAL_ENV')) + finally: + os.remove(constraint_path) + + @staticmethod + def _pip_freeze(): + """Run pip freeze in current environment and return the output""" + bin_dir = 'Scripts' if os.name == 'nt' else 'bin' + pip_path = os.path.join(sys.prefix, bin_dir, + 'pip{0}'.format('.exe' if os.name == 'nt' else '')) + pip_freeze = subprocess.Popen([pip_path, 'freeze'], stdout=subprocess.PIPE) + pip_freeze_output, _ = pip_freeze.communicate() + assert not pip_freeze.poll() + return pip_freeze_output http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/aria/orchestrator/workflows/api/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py index 1c12407..4f025b6 100644 --- a/aria/orchestrator/workflows/api/task.py +++ b/aria/orchestrator/workflows/api/task.py @@ -67,11 +67,11 @@ class OperationTask(BaseTask): max_attempts=None, retry_interval=None, ignore_failure=None, - inputs=None): + inputs=None, + plugin=None): """ Creates an operation task using the name, details, node instance and any additional kwargs. :param name: the operation of the name. - :param operation_details: the details for the operation. :param actor: the operation host on which this operation is registered. :param inputs: operation inputs. """ @@ -82,6 +82,7 @@ class OperationTask(BaseTask): self.name = '{name}.{actor.id}'.format(name=name, actor=actor) self.operation_mapping = operation_mapping self.inputs = inputs or {} + self.plugin = plugin or {} self.max_attempts = (self.workflow_context._task_max_attempts if max_attempts is None else max_attempts) self.retry_interval = (self.workflow_context._task_retry_interval @@ -98,15 +99,13 @@ class OperationTask(BaseTask): :param name: the name of the operation. """ assert isinstance(instance, models.NodeInstance) - operation_details = instance.node.operations[name] - operation_inputs = operation_details.get('inputs', {}) - operation_inputs.update(inputs or {}) - return cls(name=name, - actor=instance, - operation_mapping=operation_details.get('operation', ''), - inputs=operation_inputs, - *args, - **kwargs) + return cls._instance(instance=instance, + name=name, + operation_details=instance.node.operations[name], + inputs=inputs, + plugins=instance.node.plugins or [], + *args, + **kwargs) @classmethod def relationship_instance(cls, instance, name, operation_end, inputs=None, *args, **kwargs): @@ -125,12 +124,33 @@ class OperationTask(BaseTask): cls.TARGET_OPERATION, cls.SOURCE_OPERATION )) operation_details = getattr(instance.relationship, operation_end)[name] + if operation_end == cls.SOURCE_OPERATION: + plugins = instance.relationship.source_node.plugins + else: + plugins = instance.relationship.target_node.plugins + return cls._instance(instance=instance, + name=name, + operation_details=operation_details, + inputs=inputs, + plugins=plugins or [], + *args, + **kwargs) + + @classmethod + def _instance(cls, instance, name, operation_details, inputs, plugins, *args, **kwargs): + operation_mapping = operation_details.get('operation') operation_inputs = operation_details.get('inputs', {}) operation_inputs.update(inputs or {}) + plugin_name = operation_details.get('plugin') + matching_plugins = [p for p in plugins if p['name'] == plugin_name] + # All matching plugins should have identical package_name/package_version, so it's safe to + # take the first found. + plugin = matching_plugins[0] if matching_plugins else {} return cls(actor=instance, name=name, - operation_mapping=operation_details.get('operation'), + operation_mapping=operation_mapping, inputs=operation_inputs, + plugin=plugin, *args, **kwargs) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/aria/orchestrator/workflows/core/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py index 0be17fe..08cf26e 100644 --- a/aria/orchestrator/workflows/core/task.py +++ b/aria/orchestrator/workflows/core/task.py @@ -106,8 +106,9 @@ class OperationTask(BaseTask): def __init__(self, api_task, *args, **kwargs): super(OperationTask, self).__init__(id=api_task.id, **kwargs) self._workflow_context = api_task._workflow_context - base_task_model = api_task._workflow_context.model.task.model_cls + model = api_task._workflow_context.model + base_task_model = model.task.model_cls if isinstance(api_task.actor, models.NodeInstance): context_class = operation_context.NodeOperationContext task_model_cls = base_task_model.as_node_instance @@ -117,7 +118,13 @@ class OperationTask(BaseTask): else: raise RuntimeError('No operation context could be created for {actor.model_cls}' .format(actor=api_task.actor)) - + plugin = api_task.plugin + plugins = model.plugin.list(filters={'package_name': plugin.get('package_name', ''), + 'package_version': plugin.get('package_version', '')}, + include=['id']) + # Validation during installation ensures that at most one plugin can exists with provided + # package_name and package_version + plugin_id = plugins[0].id if plugins else None operation_task = task_model_cls( name=api_task.name, operation_mapping=api_task.operation_mapping, @@ -127,6 +134,7 @@ class OperationTask(BaseTask): max_attempts=api_task.max_attempts, retry_interval=api_task.retry_interval, ignore_failure=api_task.ignore_failure, + plugin_id=plugin_id ) self._workflow_context.model.task.put(operation_task) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/aria/orchestrator/workflows/executor/__init__.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/__init__.py b/aria/orchestrator/workflows/executor/__init__.py index 16b6c9b..414a740 100644 --- a/aria/orchestrator/workflows/executor/__init__.py +++ b/aria/orchestrator/workflows/executor/__init__.py @@ -18,5 +18,5 @@ Executors for task execution """ -from . import blocking, multiprocess, thread +from . import process, thread from .base import BaseExecutor http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/aria/orchestrator/workflows/executor/base.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py index ba44124..4ae046d 100644 --- a/aria/orchestrator/workflows/executor/base.py +++ b/aria/orchestrator/workflows/executor/base.py @@ -17,17 +17,15 @@ Base executor module """ +from aria import logger from aria.orchestrator import events -class BaseExecutor(object): +class BaseExecutor(logger.LoggerMixin): """ Base class for executors for running tasks """ - def __init__(self, *args, **kwargs): - pass - def execute(self, task): """ Execute a task http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/aria/orchestrator/workflows/executor/blocking.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/blocking.py b/aria/orchestrator/workflows/executor/blocking.py deleted file mode 100644 index 9d3a9ba..0000000 --- a/aria/orchestrator/workflows/executor/blocking.py +++ /dev/null @@ -1,36 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Blocking executor -""" - -from aria.utils import imports -from .base import BaseExecutor - - -class CurrentThreadBlockingExecutor(BaseExecutor): - """ - Executor which runs tasks in the current thread (blocking) - """ - - def execute(self, task): - self._task_started(task) - try: - task_func = imports.load_attribute(task.operation_mapping) - task_func(ctx=task.context, **task.inputs) - self._task_succeeded(task) - except BaseException as e: - self._task_failed(task, exception=e) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/aria/orchestrator/workflows/executor/multiprocess.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/multiprocess.py b/aria/orchestrator/workflows/executor/multiprocess.py deleted file mode 100644 index d770e07..0000000 --- a/aria/orchestrator/workflows/executor/multiprocess.py +++ /dev/null @@ -1,98 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Multiprocess based executor -""" - -import multiprocessing -import threading - -import jsonpickle - -from aria.utils import imports -from .base import BaseExecutor - - -class MultiprocessExecutor(BaseExecutor): - """ - Executor which runs tasks in a multiprocess environment - """ - - def __init__(self, pool_size=1, *args, **kwargs): - super(MultiprocessExecutor, self).__init__(*args, **kwargs) - self._stopped = False - self._manager = multiprocessing.Manager() - self._queue = self._manager.Queue() - self._tasks = {} - self._listener_thread = threading.Thread(target=self._listener) - self._listener_thread.daemon = True - self._listener_thread.start() - self._pool = multiprocessing.Pool(processes=pool_size) - - def execute(self, task): - self._tasks[task.id] = task - self._pool.apply_async(_multiprocess_handler, args=( - self._queue, - task.context, - task.id, - task.operation_mapping, - task.inputs)) - - def close(self): - self._pool.close() - self._stopped = True - self._pool.join() - self._listener_thread.join() - - def _listener(self): - while not self._stopped: - try: - message = self._queue.get(timeout=1) - if message.type == 'task_started': - self._task_started(self._tasks[message.task_id]) - elif message.type == 'task_succeeded': - self._task_succeeded(self._remove_task(message.task_id)) - elif message.type == 'task_failed': - self._task_failed(self._remove_task(message.task_id), - exception=jsonpickle.loads(message.exception)) - else: - # TODO: something - raise RuntimeError() - # Daemon threads - except BaseException: - pass - - def _remove_task(self, task_id): - return self._tasks.pop(task_id) - - -class _MultiprocessMessage(object): - - def __init__(self, type, task_id, exception=None): - self.type = type - self.task_id = task_id - self.exception = exception - - -def _multiprocess_handler(queue, ctx, task_id, operation_mapping, operation_inputs): - queue.put(_MultiprocessMessage(type='task_started', task_id=task_id)) - try: - task_func = imports.load_attribute(operation_mapping) - task_func(ctx=ctx, **operation_inputs) - queue.put(_MultiprocessMessage(type='task_succeeded', task_id=task_id)) - except BaseException as e: - queue.put(_MultiprocessMessage(type='task_failed', task_id=task_id, - exception=jsonpickle.dumps(e))) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py new file mode 100644 index 0000000..1a47d4c --- /dev/null +++ b/aria/orchestrator/workflows/executor/process.py @@ -0,0 +1,277 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Subprocess based executor +""" + +# pylint: disable=wrong-import-position + +import sys +import os + +# As part of the process executor implementation, subprocess are started with this module as their +# entry point. We thus remove this module's directory from the python path if it happens to be +# there +script_dir = os.path.dirname(__file__) +if script_dir in sys.path: + sys.path.remove(script_dir) + +import io +import threading +import socket +import struct +import subprocess +import tempfile +import Queue + +import jsonpickle + +from aria.utils import imports +from aria.orchestrator.workflows.executor import base + +_IS_WIN = os.name == 'nt' + +_INT_FMT = 'I' +_INT_SIZE = struct.calcsize(_INT_FMT) + + +class ProcessExecutor(base.BaseExecutor): + """ + Executor which runs tasks in a subprocess environment + """ + + def __init__(self, plugin_manager=None, python_path=None, *args, **kwargs): + super(ProcessExecutor, self).__init__(*args, **kwargs) + self._plugin_manager = plugin_manager + + # Optional list of additional directories that should be added to + # subprocesses python path + self._python_path = python_path or [] + + # Flag that denotes whether this executor has been stopped + self._stopped = False + + # Contains reference to all currently running tasks + self._tasks = {} + + # Server socket used to accept task status messages from subprocesses + self._server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._server_socket.bind(('localhost', 0)) + self._server_socket.listen(10) + self._server_port = self._server_socket.getsockname()[1] + + # Used to send a "closed" message to the listener when this executor is closed + self._messenger = _Messenger(task_id=None, port=self._server_port) + + # Queue object used by the listener thread to notify this constructed it has started + # (see last line of this __init__ method) + self._listener_started = Queue.Queue() + + # Listener thread to handle subprocesses task status messages + self._listener_thread = threading.Thread(target=self._listener) + self._listener_thread.daemon = True + self._listener_thread.start() + + # Wait for listener thread to actually start before returning + self._listener_started.get(timeout=60) + + def close(self): + if self._stopped: + return + self._stopped = True + # Listener thread may be blocked on "accept" call. This will wake it up with an explicit + # "closed" message + self._messenger.closed() + self._server_socket.close() + self._listener_thread.join(timeout=60) + + def execute(self, task): + self._check_closed() + self._tasks[task.id] = task + + # Temporary file used to pass arguments to the started subprocess + file_descriptor, arguments_json_path = tempfile.mkstemp(prefix='executor-', suffix='.json') + os.close(file_descriptor) + with open(arguments_json_path, 'wb') as f: + f.write(jsonpickle.dumps({ + 'task_id': task.id, + 'operation_mapping': task.operation_mapping, + 'operation_inputs': task.inputs, + 'port': self._server_port + })) + + env = os.environ.copy() + # See _update_env for plugin_prefix usage + if task.plugin_id and self._plugin_manager: + plugin_prefix = self._plugin_manager.get_plugin_prefix(task.plugin) + else: + plugin_prefix = None + self._update_env(env=env, plugin_prefix=plugin_prefix) + # Asynchronously start the operation in a subprocess + subprocess.Popen( + '{0} {1} {2}'.format(sys.executable, __file__, arguments_json_path), + env=env, + shell=True) + + def _remove_task(self, task_id): + return self._tasks.pop(task_id) + + def _listener(self): + # Notify __init__ method this thread has actually started + self._listener_started.put(True) + while not self._stopped: + try: + # Accept messages written to the server socket + message = self._recv_message() + message_type = message['type'] + if message_type == 'closed': + break + task_id = message['task_id'] + if message_type == 'started': + self._task_started(self._tasks[task_id]) + elif message_type == 'succeeded': + self._task_succeeded(self._remove_task(task_id)) + elif message_type == 'failed': + self._task_failed(self._remove_task(task_id), + exception=message['exception']) + else: + raise RuntimeError('Invalid state') + except BaseException as e: + self.logger.debug('Error in process executor listener: {0}'.format(e)) + + def _recv_message(self): + connection, _ = self._server_socket.accept() + try: + message_len, = struct.unpack(_INT_FMT, self._recv_bytes(connection, _INT_SIZE)) + return jsonpickle.loads(self._recv_bytes(connection, message_len)) + finally: + connection.close() + + @staticmethod + def _recv_bytes(connection, count): + result = io.BytesIO() + while True: + if not count: + return result.getvalue() + read = connection.recv(count) + if not read: + return result.getvalue() + result.write(read) + count -= len(read) + + def _check_closed(self): + if self._stopped: + raise RuntimeError('Executor closed') + + def _update_env(self, env, plugin_prefix): + pythonpath_dirs = [] + # If this is a plugin operation, plugin prefix will point to where + # This plugin is installed. + # We update the environment variables that the subprocess will be started with based on it + if plugin_prefix: + + # Update PATH environment variable to include plugin's bin dir + bin_dir = 'Scripts' if _IS_WIN else 'bin' + env['PATH'] = '{0}{1}{2}'.format( + os.path.join(plugin_prefix, bin_dir), + os.pathsep, + env.get('PATH', '')) + + # Update PYTHONPATH environment variable to include plugin's site-packages + # directories + if _IS_WIN: + pythonpath_dirs = [os.path.join(plugin_prefix, 'Lib', 'site-packages')] + else: + # In some linux environments, there will be both a lib and a lib64 directory + # with the latter, containing compiled packages. + pythonpath_dirs = [os.path.join( + plugin_prefix, 'lib{0}'.format(b), + 'python{0}.{1}'.format(sys.version_info[0], sys.version_info[1]), + 'site-packages') for b in ['', '64']] + + # Add used supplied directories to injected PYTHONPATH + pythonpath_dirs.extend(self._python_path) + + if pythonpath_dirs: + env['PYTHONPATH'] = '{0}{1}{2}'.format( + os.pathsep.join(pythonpath_dirs), + os.pathsep, + env.get('PYTHONPATH', '')) + + +class _Messenger(object): + + def __init__(self, task_id, port): + self.task_id = task_id + self.port = port + + def started(self): + """Task started message""" + self._send_message(type='started') + + def succeeded(self): + """Task succeeded message""" + self._send_message(type='succeeded') + + def failed(self, exception): + """Task failed message""" + self._send_message(type='failed', exception=exception) + + def closed(self): + """Executor closed message""" + self._send_message(type='closed') + + def _send_message(self, type, exception=None): + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect(('localhost', self.port)) + try: + data = jsonpickle.dumps({ + 'type': type, + 'task_id': self.task_id, + 'exception': exception + }) + sock.send(struct.pack(_INT_FMT, len(data))) + sock.sendall(data) + finally: + sock.close() + + +def _main(): + arguments_json_path = sys.argv[1] + with open(arguments_json_path) as f: + arguments = jsonpickle.loads(f.read()) + + # arguments_json_path is a temporary file created by the parent process. + # so we remove it here + os.remove(arguments_json_path) + + task_id = arguments['task_id'] + port = arguments['port'] + messenger = _Messenger(task_id=task_id, port=port) + + operation_mapping = arguments['operation_mapping'] + operation_inputs = arguments['operation_inputs'] + ctx = None + messenger.started() + try: + task_func = imports.load_attribute(operation_mapping) + task_func(ctx=ctx, **operation_inputs) + messenger.succeeded() + except BaseException as e: + messenger.failed(exception=e) + +if __name__ == '__main__': + _main() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/aria/orchestrator/workflows/executor/thread.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py index 76ceefd..1a6ad9f 100644 --- a/aria/orchestrator/workflows/executor/thread.py +++ b/aria/orchestrator/workflows/executor/thread.py @@ -26,7 +26,9 @@ from .base import BaseExecutor class ThreadExecutor(BaseExecutor): """ - Executor which runs tasks in a separate thread + Executor which runs tasks in a separate thread. It's easier writing tests + using this executor rather than the full blown subprocess executor. + Note: This executor is not capable of running plugin operations. """ def __init__(self, pool_size=1, *args, **kwargs): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/aria/storage/models.py ---------------------------------------------------------------------- diff --git a/aria/storage/models.py b/aria/storage/models.py index 6302e66..0a1027b 100644 --- a/aria/storage/models.py +++ b/aria/storage/models.py @@ -344,8 +344,7 @@ class Node(SQLModelBase): min_number_of_instances = Column(Integer, nullable=False) number_of_instances = Column(Integer, nullable=False) planned_number_of_instances = Column(Integer, nullable=False) - plugins = Column(Dict) - plugins_to_install = Column(Dict) + plugins = Column(List) properties = Column(Dict) operations = Column(Dict) type = Column(Text, nullable=False, index=True) @@ -474,14 +473,13 @@ class Plugin(SQLModelBase): distribution = Column(Text) distribution_release = Column(Text) distribution_version = Column(Text) - excluded_wheels = Column(Dict) package_name = Column(Text, nullable=False, index=True) package_source = Column(Text) package_version = Column(Text) - supported_platform = Column(Dict) - supported_py_versions = Column(Dict) + supported_platform = Column(Text) + supported_py_versions = Column(List) uploaded_at = Column(DateTime, nullable=False, index=True) - wheels = Column(Dict, nullable=False) + wheels = Column(List, nullable=False) class Task(SQLModelBase): @@ -550,6 +548,11 @@ class Task(SQLModelBase): name = Column(String) operation_mapping = Column(String) inputs = Column(Dict) + plugin_id = foreign_key(Plugin.id, nullable=True) + + @declared_attr + def plugin(cls): + return one_to_many_relationship(cls, Plugin, cls.plugin_id) @declared_attr def execution(cls): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/aria/utils/plugin.py ---------------------------------------------------------------------- diff --git a/aria/utils/plugin.py b/aria/utils/plugin.py new file mode 100644 index 0000000..b7f94a1 --- /dev/null +++ b/aria/utils/plugin.py @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import wagon + + +def create(source, destination_dir): + return wagon.create(source=source, archive_destination_dir=destination_dir) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/requirements.txt ---------------------------------------------------------------------- diff --git a/requirements.txt b/requirements.txt index 31b0b79..0005a5e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -24,4 +24,5 @@ Jinja2==2.8 shortuuid==0.4.3 CacheControl[filecache]==0.11.6 clint==0.5.1 -SQLAlchemy==1.1.4 \ No newline at end of file +SQLAlchemy==1.1.4 +wagon==0.5.0 http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/tests/mock/models.py ---------------------------------------------------------------------- diff --git a/tests/mock/models.py b/tests/mock/models.py index e2e3d2f..26088e0 100644 --- a/tests/mock/models.py +++ b/tests/mock/models.py @@ -152,3 +152,19 @@ def get_deployment(blueprint): outputs={}, scaling_groups={}, ) + + +def get_plugin(package_name='package', package_version='0.1'): + return models.Plugin( + archive_name='archive_name', + distribution='distribution', + distribution_release='dist_release', + distribution_version='dist_version', + package_name=package_name, + package_source='source', + package_version=package_version, + supported_platform='any', + supported_py_versions=['python27'], + uploaded_at=datetime.now(), + wheels=[], + ) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/tests/orchestrator/workflows/api/test_task.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/api/test_task.py b/tests/orchestrator/workflows/api/test_task.py index 1a90338..58e387f 100644 --- a/tests/orchestrator/workflows/api/test_task.py +++ b/tests/orchestrator/workflows/api/test_task.py @@ -39,9 +39,12 @@ class TestOperationTask(object): def test_node_operation_task_creation(self, ctx): operation_name = 'aria.interfaces.lifecycle.create' - op_details = {'operation': True} + op_details = {'operation': True, 'plugin': 'plugin'} node = ctx.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME) node.operations[operation_name] = op_details + node.plugins = [{'name': 'plugin', + 'package_name': 'package', + 'package_version': '0.1'}] ctx.model.node.update(node) node_instance = \ ctx.model.node_instance.get_by_name(mock.models.DEPENDENT_NODE_INSTANCE_NAME) @@ -66,12 +69,18 @@ class TestOperationTask(object): assert api_task.retry_interval == retry_interval assert api_task.max_attempts == max_attempts assert api_task.ignore_failure == ignore_failure + assert api_task.plugin == {'name': 'plugin', + 'package_name': 'package', + 'package_version': '0.1'} - def test_relationship_operation_task_creation(self, ctx): + def test_source_relationship_operation_task_creation(self, ctx): operation_name = 'aria.interfaces.relationship_lifecycle.preconfigure' - op_details = {'operation': True} + op_details = {'operation': True, 'plugin': 'plugin'} relationship = ctx.model.relationship.list()[0] relationship.source_operations[operation_name] = op_details + relationship.source_node.plugins = [{'name': 'plugin', + 'package_name': 'package', + 'package_version': '0.1'}] relationship_instance = ctx.model.relationship_instance.list()[0] inputs = {'inputs': True} max_attempts = 10 @@ -92,6 +101,41 @@ class TestOperationTask(object): assert api_task.inputs == inputs assert api_task.retry_interval == retry_interval assert api_task.max_attempts == max_attempts + assert api_task.plugin == {'name': 'plugin', + 'package_name': 'package', + 'package_version': '0.1'} + + def test_target_relationship_operation_task_creation(self, ctx): + operation_name = 'aria.interfaces.relationship_lifecycle.preconfigure' + op_details = {'operation': True, 'plugin': 'plugin'} + relationship = ctx.model.relationship.list()[0] + relationship.target_operations[operation_name] = op_details + relationship.target_node.plugins = [{'name': 'plugin', + 'package_name': 'package', + 'package_version': '0.1'}] + relationship_instance = ctx.model.relationship_instance.list()[0] + inputs = {'inputs': True} + max_attempts = 10 + retry_interval = 10 + + with context.workflow.current.push(ctx): + api_task = api.task.OperationTask.relationship_instance( + name=operation_name, + instance=relationship_instance, + operation_end=api.task.OperationTask.TARGET_OPERATION, + inputs=inputs, + max_attempts=max_attempts, + retry_interval=retry_interval) + + assert api_task.name == '{0}.{1}'.format(operation_name, relationship_instance.id) + assert api_task.operation_mapping is True + assert api_task.actor == relationship_instance + assert api_task.inputs == inputs + assert api_task.retry_interval == retry_interval + assert api_task.max_attempts == max_attempts + assert api_task.plugin == {'name': 'plugin', + 'package_name': 'package', + 'package_version': '0.1'} def test_operation_task_default_values(self, ctx): dependency_node_instance = ctx.model.node_instance.get_by_name( @@ -106,6 +150,7 @@ class TestOperationTask(object): assert task.retry_interval == ctx._task_retry_interval assert task.max_attempts == ctx._task_max_attempts assert task.ignore_failure == ctx._task_ignore_failure + assert task.plugin == {} class TestWorkflowTask(object): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/tests/orchestrator/workflows/core/test_task.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_task.py b/tests/orchestrator/workflows/core/test_task.py index c572501..6c3825c 100644 --- a/tests/orchestrator/workflows/core/test_task.py +++ b/tests/orchestrator/workflows/core/test_task.py @@ -42,24 +42,30 @@ class TestOperationTask(object): with workflow_context.current.push(ctx): api_task = api.task.OperationTask.node_instance( instance=node_instance, - name='aria.interfaces.lifecycle.create', - ) - + name='aria.interfaces.lifecycle.create') core_task = core.task.OperationTask(api_task=api_task) - return api_task, core_task def test_operation_task_creation(self, ctx): + storage_plugin = mock.models.get_plugin(package_name='p1', package_version='0.1') + storage_plugin_other = mock.models.get_plugin(package_name='p0', package_version='0.0') + ctx.model.plugin.put(storage_plugin_other) + ctx.model.plugin.put(storage_plugin) node_instance = \ ctx.model.node_instance.get_by_name(mock.models.DEPENDENCY_NODE_INSTANCE_NAME) + node = node_instance.node + node.plugins = [{'name': 'plugin1', + 'package_name': 'p1', + 'package_version': '0.1'}] + node.operations['aria.interfaces.lifecycle.create'] = {'plugin': 'plugin1'} api_task, core_task = self._create_operation_task(ctx, node_instance) storage_task = ctx.model.task.get_by_name(core_task.name) - assert core_task.model_task == storage_task assert core_task.name == api_task.name assert core_task.operation_mapping == api_task.operation_mapping assert core_task.actor == api_task.actor == node_instance assert core_task.inputs == api_task.inputs == storage_task.inputs + assert core_task.plugin == storage_plugin def test_operation_task_edit_locked_attribute(self, ctx): node_instance = \ http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/tests/orchestrator/workflows/executor/test_executor.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_executor.py b/tests/orchestrator/workflows/executor/test_executor.py index 654542c..7a11524 100644 --- a/tests/orchestrator/workflows/executor/test_executor.py +++ b/tests/orchestrator/workflows/executor/test_executor.py @@ -14,6 +14,7 @@ # limitations under the License. import logging +import os import uuid from contextlib import contextmanager @@ -28,57 +29,34 @@ except ImportError: _celery = None app = None +import aria from aria.storage import models from aria.orchestrator import events from aria.orchestrator.workflows.executor import ( thread, - multiprocess, - blocking, + process, # celery ) -class TestExecutor(object): - - @pytest.mark.parametrize('executor_cls,executor_kwargs', [ - (thread.ThreadExecutor, {'pool_size': 1}), - (thread.ThreadExecutor, {'pool_size': 2}), - (multiprocess.MultiprocessExecutor, {'pool_size': 1}), - (multiprocess.MultiprocessExecutor, {'pool_size': 2}), - (blocking.CurrentThreadBlockingExecutor, {}), - # (celery.CeleryExecutor, {'app': app}) - ]) - def test_execute(self, executor_cls, executor_kwargs): - self.executor = executor_cls(**executor_kwargs) - expected_value = 'value' - successful_task = MockTask(mock_successful_task) - failing_task = MockTask(mock_failing_task) - task_with_inputs = MockTask(mock_task_with_input, inputs={'input': expected_value}) - - for task in [successful_task, failing_task, task_with_inputs]: - self.executor.execute(task) - - @retrying.retry(stop_max_delay=10000, wait_fixed=100) - def assertion(): - assert successful_task.states == ['start', 'success'] - assert failing_task.states == ['start', 'failure'] - assert task_with_inputs.states == ['start', 'failure'] - assert isinstance(failing_task.exception, MockException) - assert isinstance(task_with_inputs.exception, MockException) - assert task_with_inputs.exception.message == expected_value - assertion() - - def setup_method(self): - events.start_task_signal.connect(start_handler) - events.on_success_task_signal.connect(success_handler) - events.on_failure_task_signal.connect(failure_handler) - - def teardown_method(self): - events.start_task_signal.disconnect(start_handler) - events.on_success_task_signal.disconnect(success_handler) - events.on_failure_task_signal.disconnect(failure_handler) - if hasattr(self, 'executor'): - self.executor.close() +def test_execute(executor): + expected_value = 'value' + successful_task = MockTask(mock_successful_task) + failing_task = MockTask(mock_failing_task) + task_with_inputs = MockTask(mock_task_with_input, inputs={'input': expected_value}) + + for task in [successful_task, failing_task, task_with_inputs]: + executor.execute(task) + + @retrying.retry(stop_max_delay=10000, wait_fixed=100) + def assertion(): + assert successful_task.states == ['start', 'success'] + assert failing_task.states == ['start', 'failure'] + assert task_with_inputs.states == ['start', 'failure'] + assert isinstance(failing_task.exception, MockException) + assert isinstance(task_with_inputs.exception, MockException) + assert task_with_inputs.exception.message == expected_value + assertion() def mock_successful_task(**_): @@ -116,9 +94,10 @@ class MockTask(object): self.logger = logging.getLogger() self.name = name self.inputs = inputs or {} - self.context = ctx or None + self.context = ctx self.retry_count = 0 self.max_attempts = 1 + self.plugin_id = None for state in models.Task.STATES: setattr(self, state.upper(), state) @@ -128,14 +107,36 @@ class MockTask(object): yield self -def start_handler(task, *args, **kwargs): - task.states.append('start') - - -def success_handler(task, *args, **kwargs): - task.states.append('success') - - -def failure_handler(task, exception, *args, **kwargs): - task.states.append('failure') - task.exception = exception +@pytest.fixture(params=[ + (thread.ThreadExecutor, {'pool_size': 1}), + (thread.ThreadExecutor, {'pool_size': 2}), + # subprocess needs to load a tests module so we explicitly add the root directory as if + # the project has been installed in editable mode + (process.ProcessExecutor, {'python_path': [os.path.dirname(os.path.dirname(aria.__file__))]}), + # (celery.CeleryExecutor, {'app': app}) +]) +def executor(request): + executor_cls, executor_kwargs = request.param + result = executor_cls(**executor_kwargs) + yield result + result.close() + + +@pytest.fixture(autouse=True) +def register_signals(): + def start_handler(task, *args, **kwargs): + task.states.append('start') + + def success_handler(task, *args, **kwargs): + task.states.append('success') + + def failure_handler(task, exception, *args, **kwargs): + task.states.append('failure') + task.exception = exception + events.start_task_signal.connect(start_handler) + events.on_success_task_signal.connect(success_handler) + events.on_failure_task_signal.connect(failure_handler) + yield + events.start_task_signal.disconnect(start_handler) + events.on_success_task_signal.disconnect(success_handler) + events.on_failure_task_signal.disconnect(failure_handler) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/tests/orchestrator/workflows/executor/test_process_executor.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py b/tests/orchestrator/workflows/executor/test_process_executor.py new file mode 100644 index 0000000..364d354 --- /dev/null +++ b/tests/orchestrator/workflows/executor/test_process_executor.py @@ -0,0 +1,130 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os +import uuid +import Queue +from contextlib import contextmanager + +import pytest + +from aria import application_model_storage +from aria.storage import models +from aria.utils.plugin import create as create_plugin +from aria.storage.sql_mapi import SQLAlchemyModelAPI +from aria.orchestrator import events +from aria.orchestrator import plugin +from aria.orchestrator.workflows.executor import process + + +import tests.storage +import tests.resources + + +class TestProcessExecutor(object): + + def test_plugin_execution(self, executor, mock_plugin): + task = MockTask(plugin=mock_plugin, + operation='mock_plugin1.operation') + + queue = Queue.Queue() + + def handler(_, exception=None): + queue.put(exception) + + events.on_success_task_signal.connect(handler) + events.on_failure_task_signal.connect(handler) + try: + executor.execute(task) + error = queue.get(timeout=60) + # tests/resources/plugins/mock-plugin1 is the plugin installed + # during this tests setup. The module mock_plugin1 contains a single + # operation named "operation" which calls an entry point defined in the plugin's + # setup.py. This entry points simply prints 'mock-plugin-output' to stdout. + # The "operation" operation that called this subprocess, then raises a RuntimeError + # with that subprocess output as the error message. + # This is what we assert here. This tests checks that both the PYTHONPATH (operation) + # and PATH (entry point) are properly updated in the subprocess in which the task is + # running. + assert isinstance(error, RuntimeError) + assert error.message == 'mock-plugin-output' + finally: + events.on_success_task_signal.disconnect(handler) + events.on_failure_task_signal.disconnect(handler) + + def test_closed(self, executor): + executor.close() + with pytest.raises(RuntimeError) as exc_info: + executor.execute(task=None) + assert 'closed' in exc_info.value.message + + +@pytest.fixture +def model(tmpdir): + api_kwargs = tests.storage.get_sqlite_api_kwargs(str(tmpdir)) + result = application_model_storage(SQLAlchemyModelAPI, api_kwargs=api_kwargs) + yield result + tests.storage.release_sqlite_storage(result) + + +@pytest.fixture +def plugins_dir(tmpdir): + result = tmpdir.join('plugins') + result.mkdir() + return str(result) + + +@pytest.fixture +def plugin_manager(model, plugins_dir): + return plugin.PluginManager(model=model, plugins_dir=plugins_dir) + + +@pytest.fixture +def executor(plugin_manager): + result = process.ProcessExecutor(plugin_manager=plugin_manager) + yield result + result.close() + + +@pytest.fixture +def mock_plugin(plugin_manager, tmpdir): + source = os.path.join(tests.resources.DIR, 'plugins', 'mock-plugin1') + plugin_path = create_plugin(source=source, destination_dir=str(tmpdir)) + return plugin_manager.install(source=plugin_path) + + +class MockTask(object): + + INFINITE_RETRIES = models.Task.INFINITE_RETRIES + + def __init__(self, plugin, operation): + self.id = str(uuid.uuid4()) + self.operation_mapping = operation + self.logger = logging.getLogger() + self.name = operation + self.inputs = {} + self.context = None + self.retry_count = 0 + self.max_attempts = 1 + self.plugin_id = plugin.id + self.plugin = plugin + + for state in models.Task.STATES: + setattr(self, state.upper(), state) + + @contextmanager + def _update(self): + yield self http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/tests/resources/__init__.py ---------------------------------------------------------------------- diff --git a/tests/resources/__init__.py b/tests/resources/__init__.py new file mode 100644 index 0000000..3ed601f --- /dev/null +++ b/tests/resources/__init__.py @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + + +DIR = os.path.dirname(__file__) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/tests/resources/plugins/mock-plugin1/mock_plugin1.py ---------------------------------------------------------------------- diff --git a/tests/resources/plugins/mock-plugin1/mock_plugin1.py b/tests/resources/plugins/mock-plugin1/mock_plugin1.py new file mode 100644 index 0000000..25a00d1 --- /dev/null +++ b/tests/resources/plugins/mock-plugin1/mock_plugin1.py @@ -0,0 +1,27 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import subprocess + + +def operation(**_): + process = subprocess.Popen(['mock-plugin1'], stdout=subprocess.PIPE) + output, _ = process.communicate() + assert not process.poll() + raise RuntimeError(output.strip()) + + +def console_script_entry_point(): + print 'mock-plugin-output' http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/tests/resources/plugins/mock-plugin1/setup.py ---------------------------------------------------------------------- diff --git a/tests/resources/plugins/mock-plugin1/setup.py b/tests/resources/plugins/mock-plugin1/setup.py new file mode 100644 index 0000000..88d354d --- /dev/null +++ b/tests/resources/plugins/mock-plugin1/setup.py @@ -0,0 +1,28 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from setuptools import setup + + +setup( + name='mock-plugin1', + version='0.1', + py_modules=['mock_plugin1'], + entry_points={ + 'console_scripts': [ + 'mock-plugin1 = mock_plugin1:console_script_entry_point' + ] + } +) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/tests/storage/test_models.py ---------------------------------------------------------------------- diff --git a/tests/storage/test_models.py b/tests/storage/test_models.py index 0ae5d1c..0651957 100644 --- a/tests/storage/test_models.py +++ b/tests/storage/test_models.py @@ -111,7 +111,9 @@ def _node_instances_storage(): def _execution_storage(): storage = _deployment_storage() execution = mock.models.get_execution(storage.deployment.list()[0]) + plugin = mock.models.get_plugin() storage.execution.put(execution) + storage.plugin.put(plugin) return storage @@ -531,34 +533,31 @@ class TestNode(object): @pytest.mark.parametrize( 'is_valid, name, deploy_number_of_instances, max_number_of_instances, ' 'min_number_of_instances, number_of_instances, planned_number_of_instances, plugins, ' - 'plugins_to_install, properties, operations, type, type_hierarchy', + 'properties, operations, type, type_hierarchy', [ - (False, m_cls, 1, 1, 1, 1, 1, {}, {}, {}, {}, 'type', []), - (False, 'name', m_cls, 1, 1, 1, 1, {}, {}, {}, {}, 'type', []), - (False, 'name', 1, m_cls, 1, 1, 1, {}, {}, {}, {}, 'type', []), - (False, 'name', 1, 1, m_cls, 1, 1, {}, {}, {}, {}, 'type', []), - (False, 'name', 1, 1, 1, m_cls, 1, {}, {}, {}, {}, 'type', []), - (False, 'name', 1, 1, 1, 1, m_cls, {}, {}, {}, {}, 'type', []), - (False, 'name', 1, 1, 1, 1, 1, m_cls, {}, {}, {}, 'type', []), - (False, 'name', 1, 1, 1, 1, 1, {}, m_cls, {}, {}, 'type', []), - (False, 'name', 1, 1, 1, 1, 1, {}, {}, m_cls, {}, 'type', []), - (False, 'name', 1, 1, 1, 1, 1, {}, {}, {}, m_cls, 'type', []), - (False, 'name', 1, 1, 1, 1, 1, {}, {}, {}, {}, m_cls, []), - (False, 'name', 1, 1, 1, 1, 1, {}, {}, {}, {}, 'type', m_cls), - - (True, 'name', 1, 1, 1, 1, 1, {}, {}, {}, {}, 'type', []), - (True, 'name', 1, 1, 1, 1, 1, None, {}, {}, {}, 'type', []), - (True, 'name', 1, 1, 1, 1, 1, {}, None, {}, {}, 'type', []), - (True, 'name', 1, 1, 1, 1, 1, {}, {}, None, {}, 'type', []), - (True, 'name', 1, 1, 1, 1, 1, {}, {}, {}, None, 'type', []), - (True, 'name', 1, 1, 1, 1, 1, {}, {}, {}, {}, 'type', []), - (True, 'name', 1, 1, 1, 1, 1, {}, {}, {}, {}, 'type', None), + (False, m_cls, 1, 1, 1, 1, 1, [], {}, {}, 'type', []), + (False, 'name', m_cls, 1, 1, 1, 1, [], {}, {}, 'type', []), + (False, 'name', 1, m_cls, 1, 1, 1, [], {}, {}, 'type', []), + (False, 'name', 1, 1, m_cls, 1, 1, [], {}, {}, 'type', []), + (False, 'name', 1, 1, 1, m_cls, 1, [], {}, {}, 'type', []), + (False, 'name', 1, 1, 1, 1, m_cls, [], {}, {}, 'type', []), + (False, 'name', 1, 1, 1, 1, 1, m_cls, {}, {}, 'type', []), + (False, 'name', 1, 1, 1, 1, 1, [], m_cls, {}, 'type', []), + (False, 'name', 1, 1, 1, 1, 1, [], {}, m_cls, 'type', []), + (False, 'name', 1, 1, 1, 1, 1, [], {}, {}, m_cls, []), + (False, 'name', 1, 1, 1, 1, 1, [], {}, {}, 'type', m_cls), + + (True, 'name', 1, 1, 1, 1, 1, [], {}, {}, 'type', []), + (True, 'name', 1, 1, 1, 1, 1, None, {}, {}, 'type', []), + (True, 'name', 1, 1, 1, 1, 1, [], None, {}, 'type', []), + (True, 'name', 1, 1, 1, 1, 1, [], {}, None, 'type', []), + (True, 'name', 1, 1, 1, 1, 1, [], {}, {}, 'type', None), ] ) def test_node_model_creation(self, deployment_storage, is_valid, name, deploy_number_of_instances, max_number_of_instances, min_number_of_instances, number_of_instances, - planned_number_of_instances, plugins, plugins_to_install, + planned_number_of_instances, plugins, properties, operations, type, type_hierarchy): node = _test_model( is_valid=is_valid, @@ -573,7 +572,6 @@ class TestNode(object): number_of_instances=number_of_instances, planned_number_of_instances=planned_number_of_instances, plugins=plugins, - plugins_to_install=plugins_to_install, properties=properties, operations=operations, type=type, @@ -713,58 +711,56 @@ class TestProviderContext(object): class TestPlugin(object): @pytest.mark.parametrize( 'is_valid, archive_name, distribution, distribution_release, ' - 'distribution_version, excluded_wheels, package_name, package_source, ' + 'distribution_version, package_name, package_source, ' 'package_version, supported_platform, supported_py_versions, uploaded_at, wheels', [ - (False, m_cls, 'dis_name', 'dis_rel', 'dis_ver', {}, 'pak_name', 'pak_src', 'pak_ver', - {}, {}, now, {}), - (False, 'arc_name', m_cls, 'dis_rel', 'dis_ver', {}, 'pak_name', 'pak_src', 'pak_ver', - {}, {}, now, {}), - (False, 'arc_name', 'dis_name', m_cls, 'dis_ver', {}, 'pak_name', 'pak_src', 'pak_ver', - {}, {}, now, {}), - (False, 'arc_name', 'dis_name', 'dis_rel', m_cls, {}, 'pak_name', 'pak_src', 'pak_ver', - {}, {}, now, {}), - (False, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', m_cls, 'pak_name', 'pak_src', - 'pak_ver', {}, {}, now, {}), - (False, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', {}, m_cls, 'pak_src', 'pak_ver', - {}, {}, now, {}), - (False, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', {}, 'pak_name', m_cls, 'pak_ver', - {}, {}, now, {}), - (False, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', {}, 'pak_name', 'pak_src', m_cls, - {}, {}, now, {}), - (False, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', {}, 'pak_name', 'pak_src', - 'pak_ver', m_cls, {}, now, {}), - (False, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', {}, 'pak_name', 'pak_src', - 'pak_ver', {}, m_cls, now, {}), - (False, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', {}, 'pak_name', 'pak_src', - 'pak_ver', {}, {}, m_cls, {}), - (False, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', {}, 'pak_name', 'pak_src', - 'pak_ver', {}, {}, now, m_cls), - - (True, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', {}, 'pak_name', 'pak_src', - 'pak_ver', {}, {}, now, {}), - (True, 'arc_name', None, 'dis_rel', 'dis_ver', {}, 'pak_name', 'pak_src', 'pak_ver', - {}, {}, now, {}), - (True, 'arc_name', 'dis_name', None, 'dis_ver', {}, 'pak_name', 'pak_src', 'pak_ver', - {}, {}, now, {}), - (True, 'arc_name', 'dis_name', 'dis_rel', None, {}, 'pak_name', 'pak_src', 'pak_ver', - {}, {}, now, {}), - (True, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', None, 'pak_name', 'pak_src', - 'pak_ver', {}, {}, now, {}), - (True, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', {}, 'pak_name', None, 'pak_ver', - {}, {}, now, {}), - (True, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', {}, 'pak_name', 'pak_src', None, - {}, {}, now, {}), - (True, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', {}, 'pak_name', 'pak_src', - 'pak_ver', None, {}, now, {}), - (True, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', {}, 'pak_name', 'pak_src', - 'pak_ver', {}, None, now, {}), - (True, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', {}, 'pak_name', 'pak_src', - 'pak_ver', {}, {}, now, {}), + (False, m_cls, 'dis_name', 'dis_rel', 'dis_ver', 'pak_name', 'pak_src', 'pak_ver', + 'sup_pla', [], now, []), + (False, 'arc_name', m_cls, 'dis_rel', 'dis_ver', 'pak_name', 'pak_src', 'pak_ver', + 'sup_pla', [], now, []), + (False, 'arc_name', 'dis_name', m_cls, 'dis_ver', 'pak_name', 'pak_src', 'pak_ver', + 'sup_pla', [], now, []), + (False, 'arc_name', 'dis_name', 'dis_rel', m_cls, 'pak_name', 'pak_src', 'pak_ver', + 'sup_pla', [], now, []), + (False, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', m_cls, 'pak_src', 'pak_ver', + 'sup_pla', [], now, []), + (False, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', 'pak_name', m_cls, 'pak_ver', + 'sup_pla', [], now, []), + (False, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', 'pak_name', 'pak_src', m_cls, + 'sup_pla', [], now, []), + (False, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', 'pak_name', 'pak_src', + 'pak_ver', m_cls, [], now, []), + (False, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', 'pak_name', 'pak_src', + 'pak_ver', 'sup_pla', m_cls, now, []), + (False, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', 'pak_name', 'pak_src', + 'pak_ver', 'sup_pla', [], m_cls, []), + (False, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', 'pak_name', 'pak_src', + 'pak_ver', 'sup_pla', [], now, m_cls), + + (True, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', 'pak_name', 'pak_src', + 'pak_ver', 'sup_pla', [], now, []), + (True, 'arc_name', None, 'dis_rel', 'dis_ver', 'pak_name', 'pak_src', 'pak_ver', + 'sup_pla', [], now, []), + (True, 'arc_name', 'dis_name', None, 'dis_ver', 'pak_name', 'pak_src', 'pak_ver', + 'sup_pla', [], now, []), + (True, 'arc_name', 'dis_name', 'dis_rel', None, 'pak_name', 'pak_src', 'pak_ver', + 'sup_pla', [], now, []), + (True, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', 'pak_name', 'pak_src', + 'pak_ver', 'sup_pla', [], now, []), + (True, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', 'pak_name', None, 'pak_ver', + 'sup_pla', [], now, []), + (True, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', 'pak_name', 'pak_src', None, + 'sup_pla', [], now, []), + (True, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', 'pak_name', 'pak_src', + 'pak_ver', None, [], now, []), + (True, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', 'pak_name', 'pak_src', + 'pak_ver', 'sup_pla', None, now, []), + (True, 'arc_name', 'dis_name', 'dis_rel', 'dis_ver', 'pak_name', 'pak_src', + 'pak_ver', 'sup_pla', [], now, []), ] ) def test_plugin_model_creation(self, empty_storage, is_valid, archive_name, distribution, - distribution_release, distribution_version, excluded_wheels, + distribution_release, distribution_version, package_name, package_source, package_version, supported_platform, supported_py_versions, uploaded_at, wheels): _test_model(is_valid=is_valid, @@ -776,7 +772,6 @@ class TestPlugin(object): distribution=distribution, distribution_release=distribution_release, distribution_version=distribution_version, - excluded_wheels=excluded_wheels, package_name=package_name, package_source=package_source, package_version=package_version, @@ -791,34 +786,36 @@ class TestTask(object): @pytest.mark.parametrize( 'is_valid, status, due_at, started_at, ended_at, max_attempts, retry_count, ' - 'retry_interval, ignore_failure, name, operation_mapping, inputs', + 'retry_interval, ignore_failure, name, operation_mapping, inputs, plugin_id', [ - (False, m_cls, now, now, now, 1, 1, 1, True, 'name', 'map', {}), - (False, Task.STARTED, m_cls, now, now, 1, 1, 1, True, 'name', 'map', {}), - (False, Task.STARTED, now, m_cls, now, 1, 1, 1, True, 'name', 'map', {}), - (False, Task.STARTED, now, now, m_cls, 1, 1, 1, True, 'name', 'map', {}), - (False, Task.STARTED, now, now, now, m_cls, 1, 1, True, 'name', 'map', {}), - (False, Task.STARTED, now, now, now, 1, m_cls, 1, True, 'name', 'map', {}), - (False, Task.STARTED, now, now, now, 1, 1, m_cls, True, 'name', 'map', {}), - (False, Task.STARTED, now, now, now, 1, 1, 1, True, m_cls, 'map', {}), - (False, Task.STARTED, now, now, now, 1, 1, 1, True, 'name', m_cls, {}), - (False, Task.STARTED, now, now, now, 1, 1, 1, True, 'name', 'map', m_cls), - - (True, Task.STARTED, now, now, now, 1, 1, 1, True, 'name', 'map', {}), - (True, Task.STARTED, None, now, now, 1, 1, 1, True, 'name', 'map', {}), - (True, Task.STARTED, now, None, now, 1, 1, 1, True, 'name', 'map', {}), - (True, Task.STARTED, now, now, None, 1, 1, 1, True, 'name', 'map', {}), - (True, Task.STARTED, now, now, now, 1, None, 1, True, 'name', 'map', {}), - (True, Task.STARTED, now, now, now, 1, 1, None, True, 'name', 'map', {}), - (True, Task.STARTED, now, now, now, 1, 1, 1, None, 'name', 'map', {}), - (True, Task.STARTED, now, now, now, 1, 1, 1, True, None, 'map', {}), - (True, Task.STARTED, now, now, now, 1, 1, 1, True, 'name', None, {}), - (True, Task.STARTED, now, now, now, 1, 1, 1, True, 'name', 'map', None), + (False, m_cls, now, now, now, 1, 1, 1, True, 'name', 'map', {}, '1'), + (False, Task.STARTED, m_cls, now, now, 1, 1, 1, True, 'name', 'map', {}, '1'), + (False, Task.STARTED, now, m_cls, now, 1, 1, 1, True, 'name', 'map', {}, '1'), + (False, Task.STARTED, now, now, m_cls, 1, 1, 1, True, 'name', 'map', {}, '1'), + (False, Task.STARTED, now, now, now, m_cls, 1, 1, True, 'name', 'map', {}, '1'), + (False, Task.STARTED, now, now, now, 1, m_cls, 1, True, 'name', 'map', {}, '1'), + (False, Task.STARTED, now, now, now, 1, 1, m_cls, True, 'name', 'map', {}, '1'), + (False, Task.STARTED, now, now, now, 1, 1, 1, True, m_cls, 'map', {}, '1'), + (False, Task.STARTED, now, now, now, 1, 1, 1, True, 'name', m_cls, {}, '1'), + (False, Task.STARTED, now, now, now, 1, 1, 1, True, 'name', 'map', m_cls, '1'), + (False, Task.STARTED, now, now, now, 1, 1, 1, True, 'name', 'map', {}, m_cls), + + (True, Task.STARTED, now, now, now, 1, 1, 1, True, 'name', 'map', {}, '1'), + (True, Task.STARTED, None, now, now, 1, 1, 1, True, 'name', 'map', {}, '1'), + (True, Task.STARTED, now, None, now, 1, 1, 1, True, 'name', 'map', {}, '1'), + (True, Task.STARTED, now, now, None, 1, 1, 1, True, 'name', 'map', {}, '1'), + (True, Task.STARTED, now, now, now, 1, None, 1, True, 'name', 'map', {}, '1'), + (True, Task.STARTED, now, now, now, 1, 1, None, True, 'name', 'map', {}, '1'), + (True, Task.STARTED, now, now, now, 1, 1, 1, None, 'name', 'map', {}, '1'), + (True, Task.STARTED, now, now, now, 1, 1, 1, True, None, 'map', {}, '1'), + (True, Task.STARTED, now, now, now, 1, 1, 1, True, 'name', None, {}, '1'), + (True, Task.STARTED, now, now, now, 1, 1, 1, True, 'name', 'map', None, '1'), + (True, Task.STARTED, now, now, now, 1, 1, 1, True, 'name', 'map', {}, None), ] ) def test_task_model_creation(self, execution_storage, is_valid, status, due_at, started_at, ended_at, max_attempts, retry_count, retry_interval, - ignore_failure, name, operation_mapping, inputs): + ignore_failure, name, operation_mapping, inputs, plugin_id): task = _test_model( is_valid=is_valid, storage=execution_storage, @@ -837,9 +834,12 @@ class TestTask(object): name=name, operation_mapping=operation_mapping, inputs=inputs, + plugin_id=plugin_id, )) if is_valid: assert task.execution == execution_storage.execution.list()[0] + if task.plugin_id: + assert task.plugin == execution_storage.plugin.list()[0] def test_task_max_attempts_validation(self): def create_task(max_attempts): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/tests/utils/__init__.py ---------------------------------------------------------------------- diff --git a/tests/utils/__init__.py b/tests/utils/__init__.py new file mode 100644 index 0000000..ae1e83e --- /dev/null +++ b/tests/utils/__init__.py @@ -0,0 +1,14 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5cf84eeb/tests/utils/test_plugin.py ---------------------------------------------------------------------- diff --git a/tests/utils/test_plugin.py b/tests/utils/test_plugin.py new file mode 100644 index 0000000..6f2dd92 --- /dev/null +++ b/tests/utils/test_plugin.py @@ -0,0 +1,77 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +import pytest + +from aria import application_model_storage +from aria.orchestrator import exceptions +from aria.orchestrator import plugin +from aria.utils.plugin import create as create_plugin +from aria.storage.sql_mapi import SQLAlchemyModelAPI + +from .. import storage + + +PACKAGE_NAME = 'mock-plugin' +PACKAGE_VERSION = '100' + + +class TestPluginManager(object): + + def test_install(self, plugin_manager, mock_plugin, model, plugins_dir): + plugin = plugin_manager.install(mock_plugin) + assert plugin.package_name == PACKAGE_NAME + assert plugin.package_version == PACKAGE_VERSION + assert plugin == model.plugin.get(plugin.id) + plugin_prefix = os.path.join(plugins_dir, '{0}-{1}'.format(PACKAGE_NAME, PACKAGE_VERSION)) + assert os.path.isdir(plugin_prefix) + assert plugin_prefix == plugin_manager.get_plugin_prefix(plugin) + + def test_install_already_exits(self, plugin_manager, mock_plugin): + plugin_manager.install(mock_plugin) + with pytest.raises(exceptions.PluginAlreadyExistsError): + plugin_manager.install(mock_plugin) + + +@pytest.fixture +def model(): + api_kwargs = storage.get_sqlite_api_kwargs() + model = application_model_storage(SQLAlchemyModelAPI, api_kwargs=api_kwargs) + yield model + storage.release_sqlite_storage(model) + + +@pytest.fixture +def plugins_dir(tmpdir): + result = tmpdir.join('plugins') + result.mkdir() + return str(result) + + +@pytest.fixture +def plugin_manager(model, plugins_dir): + return plugin.PluginManager(model=model, plugins_dir=plugins_dir) + + +@pytest.fixture +def mock_plugin(tmpdir): + source_dir = tmpdir.join('mock_plugin') + source_dir.mkdir() + setup_py = source_dir.join('setup.py') + setup_py.write('from setuptools import setup; setup(name="{0}", version="{1}")' + .format(PACKAGE_NAME, PACKAGE_VERSION)) + return create_plugin(source=str(source_dir), destination_dir=str(tmpdir))