Repository: incubator-ariatosca Updated Branches: refs/heads/master 5cf84eebe -> d143772d1
ARIA-41 Provide (initial) means for serializing an operation context object Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/d143772d Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/d143772d Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/d143772d Branch: refs/heads/master Commit: d143772d1497a54e447c1739083ed030328a7a0b Parents: 5cf84ee Author: Dan Kilman <[email protected]> Authored: Mon Dec 19 17:05:54 2016 +0200 Committer: Dan Kilman <[email protected]> Committed: Wed Dec 21 19:38:59 2016 +0200 ---------------------------------------------------------------------- aria/__init__.py | 26 ++--- aria/orchestrator/context/common.py | 11 +- aria/orchestrator/context/operation.py | 21 ++-- aria/orchestrator/context/serialization.py | 94 +++++++++++++++++ aria/orchestrator/context/workflow.py | 13 ++- aria/orchestrator/workflows/core/task.py | 11 +- aria/orchestrator/workflows/events_logging.py | 6 +- aria/orchestrator/workflows/executor/process.py | 22 ++-- aria/storage/filesystem_rapi.py | 5 +- tests/__init__.py | 4 + tests/mock/context.py | 18 +++- tests/orchestrator/context/test_serialize.py | 101 +++++++++++++++++++ tests/orchestrator/workflows/core/test_task.py | 1 + .../workflows/executor/test_executor.py | 19 +++- .../workflows/executor/test_process_executor.py | 11 +- 15 files changed, 303 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d143772d/aria/__init__.py ---------------------------------------------------------------------- diff --git a/aria/__init__.py b/aria/__init__.py index 0f7bec6..cc362c0 100644 --- a/aria/__init__.py +++ b/aria/__init__.py @@ -41,8 +41,6 @@ __all__ = ( 'operation', ) -_resource_storage = {} - def install_aria_extensions(): """ @@ -61,7 +59,7 @@ def install_aria_extensions(): def application_model_storage(api, api_kwargs=None): """ - Initiate model storage for the supplied storage driver + Initiate model storage """ models = [ storage.models.Plugin, @@ -85,17 +83,15 @@ def application_model_storage(api, api_kwargs=None): return storage.ModelStorage(api, items=models, api_kwargs=api_kwargs or {}) -def application_resource_storage(driver): +def application_resource_storage(api, api_kwargs=None): """ - Initiate resource storage for the supplied storage driver + Initiate resource storage """ - if driver not in _resource_storage: - _resource_storage[driver] = storage.ResourceStorage( - driver, - resources=[ - 'blueprint', - 'deployment', - 'plugin', - 'snapshot', - ]) - return _resource_storage[driver] + return storage.ResourceStorage( + api, + api_kwargs=api_kwargs or {}, + items=[ + 'blueprint', + 'deployment', + 'plugin', + ]) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d143772d/aria/orchestrator/context/common.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py index 14efd9d..fdbe152 100644 --- a/aria/orchestrator/context/common.py +++ b/aria/orchestrator/context/common.py @@ -29,13 +29,9 @@ class BaseContext(logger.LoggerMixin): def __init__( self, name, + deployment_id, model_storage, resource_storage, - deployment_id, - workflow_name, - task_max_attempts=1, - task_retry_interval=0, - task_ignore_failure=False, **kwargs): super(BaseContext, self).__init__(**kwargs) self._name = name @@ -43,16 +39,11 @@ class BaseContext(logger.LoggerMixin): self._model = model_storage self._resource = resource_storage self._deployment_id = deployment_id - self._workflow_name = workflow_name - self._task_max_attempts = task_max_attempts - self._task_retry_interval = task_retry_interval - self._task_ignore_failure = task_ignore_failure def __repr__(self): return ( '{name}(name={self.name}, ' 'deployment_id={self._deployment_id}, ' - 'workflow_name={self._workflow_name}, ' .format(name=self.__class__.__name__, self=self)) @property http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d143772d/aria/orchestrator/context/operation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py index a73bad1..19bb73a 100644 --- a/aria/orchestrator/context/operation.py +++ b/aria/orchestrator/context/operation.py @@ -26,17 +26,22 @@ class BaseOperationContext(BaseContext): Context object used during operation creation and execution """ - def __init__(self, name, workflow_context, task, actor, **kwargs): + def __init__(self, + name, + model_storage, + resource_storage, + deployment_id, + task_id, + actor_id, + **kwargs): super(BaseOperationContext, self).__init__( name=name, - model_storage=workflow_context.model, - resource_storage=workflow_context.resource, - deployment_id=workflow_context._deployment_id, - workflow_name=workflow_context._workflow_name, + model_storage=model_storage, + resource_storage=resource_storage, + deployment_id=deployment_id, **kwargs) - self._task_model = task - self._task_id = task.id - self._actor_id = actor.id + self._task_id = task_id + self._actor_id = actor_id def __repr__(self): details = 'operation_mapping={task.operation_mapping}; ' \ http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d143772d/aria/orchestrator/context/serialization.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/serialization.py b/aria/orchestrator/context/serialization.py new file mode 100644 index 0000000..93cb38a --- /dev/null +++ b/aria/orchestrator/context/serialization.py @@ -0,0 +1,94 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sqlalchemy.orm +import sqlalchemy.pool + +import aria + + +def operation_context_to_dict(context): + context_cls = context.__class__ + context_dict = { + 'name': context.name, + 'deployment_id': context._deployment_id, + 'task_id': context._task_id, + 'actor_id': context._actor_id, + } + if context.model: + model = context.model + context_dict['model_storage'] = { + 'api_cls': model.api, + 'api_kwargs': _serialize_sql_mapi_kwargs(model) + } + else: + context_dict['model_storage'] = None + if context.resource: + resource = context.resource + context_dict['resource_storage'] = { + 'api_cls': resource.api, + 'api_kwargs': _serialize_file_rapi_kwargs(resource) + } + else: + context_dict['resource_storage'] = None + return { + 'context_cls': context_cls, + 'context': context_dict + } + + +def operation_context_from_dict(context_dict): + context_cls = context_dict['context_cls'] + context = context_dict['context'] + + model_storage = context['model_storage'] + if model_storage: + api_cls = model_storage['api_cls'] + api_kwargs = _deserialize_sql_mapi_kwargs(model_storage.get('api_kwargs', {})) + context['model_storage'] = aria.application_model_storage(api=api_cls, + api_kwargs=api_kwargs) + + resource_storage = context['resource_storage'] + if resource_storage: + api_cls = resource_storage['api_cls'] + api_kwargs = _deserialize_file_rapi_kwargs(resource_storage.get('api_kwargs', {})) + context['resource_storage'] = aria.application_resource_storage(api=api_cls, + api_kwargs=api_kwargs) + + return context_cls(**context) + + +def _serialize_sql_mapi_kwargs(model): + engine_url = str(model._api_kwargs['engine'].url) + assert ':memory:' not in engine_url + return {'engine_url': engine_url} + + +def _deserialize_sql_mapi_kwargs(api_kwargs): + engine_url = api_kwargs.get('engine_url') + if not engine_url: + return {} + engine = sqlalchemy.create_engine(engine_url) + session_factory = sqlalchemy.orm.sessionmaker(bind=engine) + session = sqlalchemy.orm.scoped_session(session_factory=session_factory) + return {'session': session, 'engine': engine} + + +def _serialize_file_rapi_kwargs(resource): + return {'directory': resource._api_kwargs['directory']} + + +def _deserialize_file_rapi_kwargs(api_kwargs): + return api_kwargs http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d143772d/aria/orchestrator/context/workflow.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/workflow.py b/aria/orchestrator/context/workflow.py index e2e8e25..e3be2d5 100644 --- a/aria/orchestrator/context/workflow.py +++ b/aria/orchestrator/context/workflow.py @@ -29,9 +29,20 @@ class WorkflowContext(BaseContext): """ Context object used during workflow creation and execution """ - def __init__(self, parameters=None, execution_id=None, *args, **kwargs): + def __init__(self, + workflow_name, + parameters=None, + execution_id=None, + task_max_attempts=1, + task_retry_interval=0, + task_ignore_failure=False, + *args, **kwargs): super(WorkflowContext, self).__init__(*args, **kwargs) + self._workflow_name = workflow_name self.parameters = parameters or {} + self._task_max_attempts = task_max_attempts + self._task_retry_interval = task_retry_interval + self._task_ignore_failure = task_ignore_failure # TODO: execution creation should happen somewhere else # should be moved there, when such logical place exists self._execution_id = self._create_execution() if execution_id is None else execution_id http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d143772d/aria/orchestrator/workflows/core/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py index 08cf26e..663eeac 100644 --- a/aria/orchestrator/workflows/core/task.py +++ b/aria/orchestrator/workflows/core/task.py @@ -134,14 +134,17 @@ class OperationTask(BaseTask): max_attempts=api_task.max_attempts, retry_interval=api_task.retry_interval, ignore_failure=api_task.ignore_failure, - plugin_id=plugin_id + plugin_id=plugin_id, + execution_id=self._workflow_context.execution.id ) self._workflow_context.model.task.put(operation_task) self._ctx = context_class(name=api_task.name, - workflow_context=self._workflow_context, - task=operation_task, - actor=operation_task.actor) + model_storage=self._workflow_context.model, + resource_storage=self._workflow_context.resource, + deployment_id=self._workflow_context._deployment_id, + task_id=operation_task.id, + actor_id=api_task.actor.id) self._task_id = operation_task.id self._update_fields = None http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d143772d/aria/orchestrator/workflows/events_logging.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/events_logging.py b/aria/orchestrator/workflows/events_logging.py index 409ce0a..142ef74 100644 --- a/aria/orchestrator/workflows/events_logging.py +++ b/aria/orchestrator/workflows/events_logging.py @@ -35,9 +35,9 @@ def _success_task_handler(task, **kwargs): @events.on_failure_task_signal.connect -def _failure_operation_handler(task, **kwargs): - task.logger.error('Event: Task failure: {task.name}'.format(task=task), - exc_info=kwargs.get('exception', True)) +def _failure_operation_handler(task, exception, **kwargs): + error = '{0}: {1}'.format(type(exception).__name__, exception) + task.logger.error('Event: Task failure: {task.name} [{error}]'.format(task=task, error=error)) @events.start_workflow_signal.connect http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d143772d/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index 1a47d4c..5da03dd 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -41,6 +41,7 @@ import jsonpickle from aria.utils import imports from aria.orchestrator.workflows.executor import base +from aria.orchestrator.context import serialization _IS_WIN = os.name == 'nt' @@ -106,12 +107,7 @@ class ProcessExecutor(base.BaseExecutor): file_descriptor, arguments_json_path = tempfile.mkstemp(prefix='executor-', suffix='.json') os.close(file_descriptor) with open(arguments_json_path, 'wb') as f: - f.write(jsonpickle.dumps({ - 'task_id': task.id, - 'operation_mapping': task.operation_mapping, - 'operation_inputs': task.inputs, - 'port': self._server_port - })) + f.write(jsonpickle.dumps(self._create_arguments_dict(task))) env = os.environ.copy() # See _update_env for plugin_prefix usage @@ -176,6 +172,15 @@ class ProcessExecutor(base.BaseExecutor): if self._stopped: raise RuntimeError('Executor closed') + def _create_arguments_dict(self, task): + return { + 'task_id': task.id, + 'operation_mapping': task.operation_mapping, + 'operation_inputs': task.inputs, + 'port': self._server_port, + 'context': serialization.operation_context_to_dict(task.context), + } + def _update_env(self, env, plugin_prefix): pythonpath_dirs = [] # If this is a plugin operation, plugin prefix will point to where @@ -261,12 +266,13 @@ def _main(): task_id = arguments['task_id'] port = arguments['port'] messenger = _Messenger(task_id=task_id, port=port) + messenger.started() operation_mapping = arguments['operation_mapping'] operation_inputs = arguments['operation_inputs'] - ctx = None - messenger.started() + context_dict = arguments['context'] try: + ctx = serialization.operation_context_from_dict(context_dict) task_func = imports.load_attribute(operation_mapping) task_func(ctx=ctx, **operation_inputs) messenger.succeeded() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d143772d/aria/storage/filesystem_rapi.py ---------------------------------------------------------------------- diff --git a/aria/storage/filesystem_rapi.py b/aria/storage/filesystem_rapi.py index f810f58..c6b3a81 100644 --- a/aria/storage/filesystem_rapi.py +++ b/aria/storage/filesystem_rapi.py @@ -87,7 +87,10 @@ class FileSystemResourceAPI(api.ResourceAPI): os.makedirs(self.directory) except (OSError, IOError): pass - os.makedirs(self.base_path) + try: + os.makedirs(self.base_path) + except (OSError, IOError): + pass def read(self, entry_id, path=None, **_): """ http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d143772d/tests/__init__.py ---------------------------------------------------------------------- diff --git a/tests/__init__.py b/tests/__init__.py index ae1e83e..d2858d2 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -12,3 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +import os + +ROOT_DIR = os.path.dirname(os.path.dirname(__file__)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d143772d/tests/mock/context.py ---------------------------------------------------------------------- diff --git a/tests/mock/context.py b/tests/mock/context.py index 1904140..5559675 100644 --- a/tests/mock/context.py +++ b/tests/mock/context.py @@ -13,15 +13,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -from aria import application_model_storage +import aria from aria.orchestrator import context +from aria.storage.filesystem_rapi import FileSystemResourceAPI from aria.storage.sql_mapi import SQLAlchemyModelAPI from . import models -def simple(api_kwargs, **kwargs): - model_storage = application_model_storage(SQLAlchemyModelAPI, api_kwargs=api_kwargs) +def simple(mapi_kwargs, resources_dir=None, **kwargs): + model_storage = aria.application_model_storage(SQLAlchemyModelAPI, api_kwargs=mapi_kwargs) blueprint = models.get_blueprint() model_storage.blueprint.put(blueprint) deployment = models.get_deployment(blueprint) @@ -56,10 +57,19 @@ def simple(api_kwargs, **kwargs): ) model_storage.relationship_instance.put(relationship_instance) + # pytest tmpdir + if resources_dir: + resource_storage = aria.application_resource_storage( + FileSystemResourceAPI, + api_kwargs={'directory': resources_dir} + ) + else: + resource_storage = None + final_kwargs = dict( name='simple_context', model_storage=model_storage, - resource_storage=None, + resource_storage=resource_storage, deployment_id=deployment.id, workflow_name=models.WORKFLOW_NAME, task_max_attempts=models.TASK_MAX_ATTEMPTS, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d143772d/tests/orchestrator/context/test_serialize.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_serialize.py b/tests/orchestrator/context/test_serialize.py new file mode 100644 index 0000000..ed0afcd --- /dev/null +++ b/tests/orchestrator/context/test_serialize.py @@ -0,0 +1,101 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +import aria +from aria.storage.sql_mapi import SQLAlchemyModelAPI +from aria.orchestrator.workflows import api +from aria.orchestrator.workflows.core import engine +from aria.orchestrator.workflows.executor import process +from aria.orchestrator import workflow, operation +from aria.orchestrator.context import serialization + +import tests +from tests import mock +from tests import storage + +TEST_FILE_CONTENT = 'CONTENT' +TEST_FILE_ENTRY_ID = 'entry' +TEST_FILE_NAME = 'test_file' + + +def test_serialize_operation_context(context, executor, tmpdir): + test_file = tmpdir.join(TEST_FILE_NAME) + test_file.write(TEST_FILE_CONTENT) + resource = context.resource + resource.blueprint.upload(TEST_FILE_ENTRY_ID, str(test_file)) + graph = _mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter + eng = engine.Engine(executor=executor, workflow_context=context, tasks_graph=graph) + eng.execute() + + +def test_illegal_serialize_of_memory_model_storage(memory_model_storage): + with pytest.raises(AssertionError): + serialization._serialize_sql_mapi_kwargs(memory_model_storage) + + +@workflow +def _mock_workflow(ctx, graph): + node_instance = ctx.model.node_instance.get_by_name(mock.models.DEPENDENCY_NODE_INSTANCE_NAME) + node_instance.node.operations['test.op'] = {'operation': _operation_mapping()} + task = api.task.OperationTask.node_instance(instance=node_instance, name='test.op') + graph.add_tasks(task) + return graph + + +@operation +def _mock_operation(ctx): + # We test several things in this operation + # ctx.task, ctx.node, etc... tell us that the model storage was properly re-created + # a correct ctx.task.operation_mapping tells us we kept the correct task_id + assert ctx.task.operation_mapping == _operation_mapping() + # a correct ctx.node.name tells us we kept the correct actor_id + assert ctx.node.name == mock.models.DEPENDENCY_NODE_NAME + # a correct ctx.name tells us we kept the correct name + assert ctx.name is not None + assert ctx.name == ctx.task.name + # a correct ctx.deployment.name tells us we kept the correct deployment_id + assert ctx.deployment.name == mock.models.DEPLOYMENT_NAME + # Here we test that the resource storage was properly re-created + test_file_content = ctx.resource.blueprint.read(TEST_FILE_ENTRY_ID, TEST_FILE_NAME) + assert test_file_content == TEST_FILE_CONTENT + + +def _operation_mapping(): + return '{name}.{func.__name__}'.format(name=__name__, func=_mock_operation) + + [email protected] +def executor(): + result = process.ProcessExecutor(python_path=[tests.ROOT_DIR]) + yield result + result.close() + + [email protected] +def context(tmpdir): + result = mock.context.simple(storage.get_sqlite_api_kwargs(str(tmpdir)), + resources_dir=str(tmpdir.join('resources'))) + yield result + storage.release_sqlite_storage(result.model) + + [email protected] +def memory_model_storage(): + result = aria.application_model_storage( + SQLAlchemyModelAPI, api_kwargs=storage.get_sqlite_api_kwargs()) + yield result + storage.release_sqlite_storage(result) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d143772d/tests/orchestrator/workflows/core/test_task.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_task.py b/tests/orchestrator/workflows/core/test_task.py index 6c3825c..fc11548 100644 --- a/tests/orchestrator/workflows/core/test_task.py +++ b/tests/orchestrator/workflows/core/test_task.py @@ -60,6 +60,7 @@ class TestOperationTask(object): node.operations['aria.interfaces.lifecycle.create'] = {'plugin': 'plugin1'} api_task, core_task = self._create_operation_task(ctx, node_instance) storage_task = ctx.model.task.get_by_name(core_task.name) + assert storage_task.execution_id == ctx.execution.id assert core_task.model_task == storage_task assert core_task.name == api_task.name assert core_task.operation_mapping == api_task.operation_mapping http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d143772d/tests/orchestrator/workflows/executor/test_executor.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_executor.py b/tests/orchestrator/workflows/executor/test_executor.py index 7a11524..5ded4fb 100644 --- a/tests/orchestrator/workflows/executor/test_executor.py +++ b/tests/orchestrator/workflows/executor/test_executor.py @@ -14,7 +14,6 @@ # limitations under the License. import logging -import os import uuid from contextlib import contextmanager @@ -29,7 +28,6 @@ except ImportError: _celery = None app = None -import aria from aria.storage import models from aria.orchestrator import events from aria.orchestrator.workflows.executor import ( @@ -38,6 +36,8 @@ from aria.orchestrator.workflows.executor import ( # celery ) +import tests + def test_execute(executor): expected_value = 'value' @@ -80,11 +80,20 @@ class MockException(Exception): pass +class MockContext(object): + + def __init__(self, *args, **kwargs): + pass + + def __getattr__(self, item): + return None + + class MockTask(object): INFINITE_RETRIES = models.Task.INFINITE_RETRIES - def __init__(self, func, inputs=None, ctx=None): + def __init__(self, func, inputs=None): self.states = [] self.exception = None self.id = str(uuid.uuid4()) @@ -94,7 +103,7 @@ class MockTask(object): self.logger = logging.getLogger() self.name = name self.inputs = inputs or {} - self.context = ctx + self.context = MockContext() self.retry_count = 0 self.max_attempts = 1 self.plugin_id = None @@ -112,7 +121,7 @@ class MockTask(object): (thread.ThreadExecutor, {'pool_size': 2}), # subprocess needs to load a tests module so we explicitly add the root directory as if # the project has been installed in editable mode - (process.ProcessExecutor, {'python_path': [os.path.dirname(os.path.dirname(aria.__file__))]}), + (process.ProcessExecutor, {'python_path': [tests.ROOT_DIR]}), # (celery.CeleryExecutor, {'app': app}) ]) def executor(request): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d143772d/tests/orchestrator/workflows/executor/test_process_executor.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py b/tests/orchestrator/workflows/executor/test_process_executor.py index 364d354..0098f30 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor.py +++ b/tests/orchestrator/workflows/executor/test_process_executor.py @@ -106,6 +106,15 @@ def mock_plugin(plugin_manager, tmpdir): return plugin_manager.install(source=plugin_path) +class MockContext(object): + + def __init__(self, *args, **kwargs): + pass + + def __getattr__(self, item): + return None + + class MockTask(object): INFINITE_RETRIES = models.Task.INFINITE_RETRIES @@ -116,7 +125,7 @@ class MockTask(object): self.logger = logging.getLogger() self.name = operation self.inputs = {} - self.context = None + self.context = MockContext() self.retry_count = 0 self.max_attempts = 1 self.plugin_id = plugin.id
