Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-278-Remove-core-tasks f36fe86c1 -> 62fa985c6
wip2 Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/62fa985c Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/62fa985c Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/62fa985c Branch: refs/heads/ARIA-278-Remove-core-tasks Commit: 62fa985c6bf1e888710dd109a35cc122d7841ba2 Parents: f36fe86 Author: max-orlov <[email protected]> Authored: Mon Jun 12 19:47:50 2017 +0300 Committer: max-orlov <[email protected]> Committed: Mon Jun 12 19:47:50 2017 +0300 ---------------------------------------------------------------------- aria/modeling/orchestration.py | 68 ++- aria/orchestrator/workflows/core/engine.py | 13 +- .../workflows/core/events_handler.py | 72 +-- aria/orchestrator/workflows/core/translation.py | 6 +- aria/orchestrator/workflows/events_logging.py | 25 +- aria/orchestrator/workflows/executor/base.py | 31 +- aria/orchestrator/workflows/executor/process.py | 18 +- aria/orchestrator/workflows/executor/thread.py | 19 +- tests/orchestrator/context/__init__.py | 5 +- tests/orchestrator/context/test_operation.py | 14 +- .../orchestrator/execution_plugin/test_local.py | 4 +- .../orchestrator/workflows/core/_test_engine.py | 519 ------------------- .../orchestrator/workflows/core/test_engine.py | 519 +++++++++++++++++++ .../orchestrator/workflows/core/test_events.py | 2 +- .../orchestrator/workflows/executor/__init__.py | 22 +- .../workflows/executor/test_executor.py | 2 +- 16 files changed, 665 insertions(+), 674 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62fa985c/aria/modeling/orchestration.py ---------------------------------------------------------------------- diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py index 36f1421..9ac885d 100644 --- a/aria/modeling/orchestration.py +++ b/aria/modeling/orchestration.py @@ -21,6 +21,7 @@ classes: """ # pylint: disable=no-self-argument, no-member, abstract-method +from contextlib import contextmanager from datetime import datetime from sqlalchemy import ( @@ -308,25 +309,8 @@ class TaskBase(mixins.ModelMixin): api_id = Column(String) _executor = Column(PickleType) - _executor_kwargs = Column(PickleType, default=None) - - _context = Column(PickleType) - _context_kwargs = Column(PickleType, default=None) - - @property - def executor(self): - return self._executor(**(self._executor_kwargs or {})) - - @property - def context(self): - return self._context.instantiate_from_dict(task_id=self.id, **self._context_kwargs) - - def execute(self): - executor = self.executor - try: - return executor.execute(self) - finally: - executor.close() + _executor_kwargs = Column(modeling_types.Dict) + _context_cls = Column(PickleType) @declared_attr def logs(cls): @@ -438,17 +422,14 @@ class TaskBase(mixins.ModelMixin): return self.status in (self.PENDING, self.RETRYING) @classmethod - def from_api_task(cls, api_task, executor, executor_kwargs=None): - from aria.modeling import models + def from_api_task(cls, api_task, executor, executor_kwargs=None, **kwargs): from aria.orchestrator import context + instantiation_kwargs = {} - instantiation_kwargs = {'_executor': executor, - '_executor_kwargs': executor_kwargs} - - if isinstance(api_task.actor, models.Node): + if hasattr(api_task.actor, 'outbound_relationships'): context_cls = context.operation.NodeOperationContext instantiation_kwargs['node'] = api_task.actor - elif isinstance(api_task.actor, models.Relationship): + elif hasattr(api_task.actor, 'source_node'): context_cls = context.operation.RelationshipOperationContext instantiation_kwargs['relationship'] = api_task.actor else: @@ -471,21 +452,34 @@ class TaskBase(mixins.ModelMixin): 'function': api_task.function, 'arguments': api_task.arguments, 'api_id': api_task.id, - - '_context': context_cls, - '_context_kwargs': { - 'name': api_task.name, - 'model_storage': api_task._workflow_context.model.serialization_dict, - 'resource_storage': api_task._workflow_context.resource.serialization_dict, - 'service_id': api_task._workflow_context._service_id, - 'actor_id': api_task.id, - 'execution_id': api_task._workflow_context._execution_id, - 'workdir': api_task._workflow_context._workdir - } + '_context_cls': context_cls, + '_executor': executor, + '_executor_kwargs': executor_kwargs or {} }) + instantiation_kwargs.update(**kwargs) + return cls(**instantiation_kwargs) + def execute(self, ctx): + from aria.orchestrator.context import operation + context_cls = self._context_cls or operation.BaseOperationContext + op_ctx = context_cls( + model_storage=ctx.model, + resource_storage=ctx.resource, + workdir=ctx._workdir, + task_id=self.id, + actor_id=self.actor.id if self.actor else None, + service_id=self.execution.service.id, + execution_id=self.execution.id, + name=self.name + ) + executor = self._executor(**(self._executor_kwargs or {})) + try: + return executor.execute(op_ctx) + finally: + executor.close() + class LogBase(mixins.ModelMixin): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62fa985c/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py index 120d83a..02749b7 100644 --- a/aria/orchestrator/workflows/core/engine.py +++ b/aria/orchestrator/workflows/core/engine.py @@ -43,12 +43,12 @@ class Engine(logger.LoggerMixin): self._execution_graph = networkx.DiGraph() translation.build_execution_graph(task_graph=tasks_graph, execution_graph=self._execution_graph, - executor_kwargs=executor_kwargs, default_executor=executor, + executor_kwargs=executor_kwargs, execution=workflow_context.execution) - # Update the storage - workflow_context.execution = workflow_context.execution + # Flush changes + workflow_context.model.execution._session.flush() def execute(self): """ @@ -99,7 +99,7 @@ class Engine(logger.LoggerMixin): return (task for task in self._tasks_iter() if task.has_ended()) def _task_has_dependencies(self, task): - return len(self._execution_graph.pred.get(task.id, {})) > 0 + return len(self._execution_graph.pred.get(task.api_id, {})) > 0 def _all_tasks_consumed(self): return len(self._execution_graph.node) == 0 @@ -112,11 +112,10 @@ class Engine(logger.LoggerMixin): self._workflow_context.model.task.refresh(task) yield task - @staticmethod - def _handle_executable_task(task): + def _handle_executable_task(self, task): if not task.stub_type: events.sent_task_signal.send(task) - task.execute() + task.execute(self._workflow_context) def _handle_ended_tasks(self, task): if task.status == models.Task.FAILED and not task.ignore_failure: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62fa985c/aria/orchestrator/workflows/core/events_handler.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py index c733e79..5d979c4 100644 --- a/aria/orchestrator/workflows/core/events_handler.py +++ b/aria/orchestrator/workflows/core/events_handler.py @@ -35,44 +35,43 @@ def _task_sent(task, *args, **kwargs): @events.start_task_signal.connect -def _task_started(task, *args, **kwargs): - task.started_at = datetime.utcnow() - task.status = task.STARTED - _update_node_state_if_necessary(task, is_transitional=True) +def _task_started(ctx, *args, **kwargs): + ctx.task.started_at = datetime.utcnow() + ctx.task.status = ctx.task.STARTED + _update_node_state_if_necessary(ctx, is_transitional=True) @events.on_failure_task_signal.connect -def _task_failed(task, exception, *args, **kwargs): - with task._update(): - should_retry = all([ - not isinstance(exception, exceptions.TaskAbortException), - task.attempts_count < task.max_attempts or task.max_attempts == task.INFINITE_RETRIES, - # ignore_failure check here means the task will not be retries and it will be marked - # as failed. The engine will also look at ignore_failure so it won't fail the - # workflow. - not task.ignore_failure - ]) - if should_retry: - retry_interval = None - if isinstance(exception, exceptions.TaskRetryException): - retry_interval = exception.retry_interval - if retry_interval is None: - retry_interval = task.retry_interval - task.status = task.RETRYING - task.attempts_count += 1 - task.due_at = datetime.utcnow() + timedelta(seconds=retry_interval) - else: - task.ended_at = datetime.utcnow() - task.status = task.FAILED +def _task_failed(ctx, exception, *args, **kwargs): + should_retry = all([ + not isinstance(exception, exceptions.TaskAbortException), + ctx.task.attempts_count < ctx.task.max_attempts or + ctx.task.max_attempts == ctx.task.INFINITE_RETRIES, + # ignore_failure check here means the task will not be retries and it will be marked + # as failed. The engine will also look at ignore_failure so it won't fail the + # workflow. + not ctx.task.ignore_failure + ]) + if should_retry: + retry_interval = None + if isinstance(exception, exceptions.TaskRetryException): + retry_interval = exception.retry_interval + if retry_interval is None: + retry_interval = ctx.task.retry_interval + ctx.task.status = ctx.task.RETRYING + ctx.task.attempts_count += 1 + ctx.task.due_at = datetime.utcnow() + timedelta(seconds=retry_interval) + else: + ctx.task.ended_at = datetime.utcnow() + ctx.task.status = ctx.task.FAILED @events.on_success_task_signal.connect -def _task_succeeded(task, *args, **kwargs): - with task._update(): - task.ended_at = datetime.utcnow() - task.status = task.SUCCESS +def _task_succeeded(ctx, *args, **kwargs): + ctx.task.ended_at = datetime.utcnow() + ctx.task.status = ctx.task.SUCCESS - _update_node_state_if_necessary(task) + _update_node_state_if_necessary(ctx) @events.start_workflow_signal.connect @@ -131,16 +130,17 @@ def _workflow_cancelling(workflow_context, *args, **kwargs): workflow_context.execution = execution -def _update_node_state_if_necessary(task, is_transitional=False): +def _update_node_state_if_necessary(ctx, is_transitional=False): # TODO: this is not the right way to check! the interface name is arbitrary # and also will *never* be the type name - node = task.node if task is not None else None + node = ctx.task.node if ctx.task is not None else None if (node is not None) and \ - (task.interface_name in ('Standard', 'tosca.interfaces.node.lifecycle.Standard')): - state = node.determine_state(op_name=task.operation_name, is_transitional=is_transitional) + (ctx.task.interface_name in ('Standard', 'tosca.interfaces.node.lifecycle.Standard')): + state = node.determine_state(op_name=ctx.task.operation_name, + is_transitional=is_transitional) if state: node.state = state - task.context.model.node.update(node) + ctx.model.node.update(node) def _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, status): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62fa985c/aria/orchestrator/workflows/core/translation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/translation.py b/aria/orchestrator/workflows/core/translation.py index 50fd65b..93c173e 100644 --- a/aria/orchestrator/workflows/core/translation.py +++ b/aria/orchestrator/workflows/core/translation.py @@ -26,8 +26,8 @@ def build_execution_graph( task_graph, execution_graph, default_executor, + executor_kwargs, execution, - executor_kwargs=None, start_stub_type=models.Task.START_WORKFLOW, end_stub_type=models.Task.END_WORKFLOW, depends_on=()): @@ -63,8 +63,8 @@ def build_execution_graph( task_graph=api_task, execution_graph=execution_graph, default_executor=default_executor, - execution=execution, executor_kwargs=executor_kwargs, + execution=execution, start_stub_type=models.Task.START_SUBWROFKLOW, end_stub_type=models.Task.END_SUBWORKFLOW, depends_on=operation_dependencies @@ -73,7 +73,7 @@ def build_execution_graph( stub_task = models.Task(api_id=api_task.id, _executor=base.StubTaskExecutor, execution=execution, - stub_type=models.StubTask.STUB) + stub_type=models.Task.STUB) _add_task_and_dependencies(execution_graph, stub_task, operation_dependencies) else: raise RuntimeError('Undefined state') http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62fa985c/aria/orchestrator/workflows/events_logging.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/events_logging.py b/aria/orchestrator/workflows/events_logging.py index 036c1f7..4cee867 100644 --- a/aria/orchestrator/workflows/events_logging.py +++ b/aria/orchestrator/workflows/events_logging.py @@ -34,31 +34,32 @@ def _get_task_name(task): @events.start_task_signal.connect -def _start_task_handler(task, **kwargs): +def _start_task_handler(ctx, **kwargs): # If the task has no function this is an empty task. - if task.function: + if ctx.task.function: suffix = 'started...' - logger = task.context.logger.info + logger = ctx.logger.info else: suffix = 'has no implementation' - logger = task.context.logger.debug + logger = ctx.logger.debug logger('{name} {task.interface_name}.{task.operation_name} {suffix}'.format( - name=_get_task_name(task), task=task, suffix=suffix)) + name=_get_task_name(ctx.task), task=ctx.task, suffix=suffix)) + @events.on_success_task_signal.connect -def _success_task_handler(task, **kwargs): - if not task.function: +def _success_task_handler(ctx, **kwargs): + if not ctx.task.function: return - task.context.logger.info('{name} {task.interface_name}.{task.operation_name} successful' - .format(name=_get_task_name(task), task=task)) + ctx.logger.info('{name} {task.interface_name}.{task.operation_name} successful' + .format(name=_get_task_name(ctx.task), task=ctx.task)) @events.on_failure_task_signal.connect -def _failure_operation_handler(task, traceback, **kwargs): - task.context.logger.error( +def _failure_operation_handler(ctx, traceback, **kwargs): + ctx.logger.error( '{name} {task.interface_name}.{task.operation_name} failed' - .format(name=_get_task_name(task), task=task), extra=dict(traceback=traceback) + .format(name=_get_task_name(ctx.task), task=ctx.task), extra=dict(traceback=traceback) ) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62fa985c/aria/orchestrator/workflows/executor/base.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py index 7fece6f..69e4a39 100644 --- a/aria/orchestrator/workflows/executor/base.py +++ b/aria/orchestrator/workflows/executor/base.py @@ -28,19 +28,21 @@ class BaseExecutor(logger.LoggerMixin): def _execute(self, task): raise NotImplementedError - def execute(self, task): + def execute(self, ctx): """ Execute a task :param task: task to execute """ - if task.function: - self._execute(task) + if ctx.task.function: + self._execute(ctx) + ctx.model.task.update(ctx.task) else: # In this case the task is missing a function. This task still gets to an # executor, but since there is nothing to run, we by default simply skip the execution # itself. - self._task_started(task) - self._task_succeeded(task) + self._task_started(ctx) + self._task_succeeded(ctx) + ctx.model.task.update(ctx.task) def close(self): """ @@ -49,18 +51,21 @@ class BaseExecutor(logger.LoggerMixin): pass @staticmethod - def _task_started(task): - events.start_task_signal.send(task) + def _task_started(ctx): + events.start_task_signal.send(ctx) + ctx.model.task.update(ctx.task) @staticmethod - def _task_failed(task, exception, traceback=None): - events.on_failure_task_signal.send(task, exception=exception, traceback=traceback) + def _task_failed(ctx, exception, traceback=None): + events.on_failure_task_signal.send(ctx, exception=exception, traceback=traceback) + ctx.model.task.update(ctx.task) @staticmethod - def _task_succeeded(task): - events.on_success_task_signal.send(task) + def _task_succeeded(ctx): + events.on_success_task_signal.send(ctx) + ctx.model.task.update(ctx.task) class StubTaskExecutor(BaseExecutor): # pylint: disable=abstract-method - def execute(self, task): - task.status = task.SUCCESS + def execute(self, ctx): + ctx.task.status = ctx.task.SUCCESS http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62fa985c/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index 634f1f2..8518b33 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -113,17 +113,17 @@ class ProcessExecutor(base.BaseExecutor): self._server_socket.close() self._listener_thread.join(timeout=60) - def _execute(self, task): + def _execute(self, ctx): self._check_closed() - self._tasks[task.id] = task + self._tasks[ctx.task.id] = ctx # Temporary file used to pass arguments to the started subprocess file_descriptor, arguments_json_path = tempfile.mkstemp(prefix='executor-', suffix='.json') os.close(file_descriptor) with open(arguments_json_path, 'wb') as f: - f.write(pickle.dumps(self._create_arguments_dict(task))) + f.write(pickle.dumps(self._create_arguments_dict(ctx))) - env = self._construct_subprocess_env(task=task) + env = self._construct_subprocess_env(task=ctx.task) # Asynchronously start the operation in a subprocess subprocess.Popen( '{0} {1} {2}'.format(sys.executable, __file__, arguments_json_path), @@ -137,13 +137,13 @@ class ProcessExecutor(base.BaseExecutor): if self._stopped: raise RuntimeError('Executor closed') - def _create_arguments_dict(self, task): + def _create_arguments_dict(self, ctx): return { - 'task_id': task.id, - 'function': task.function, - 'operation_arguments': dict(arg.unwrapped for arg in task.arguments.values()), + 'task_id': ctx.task.id, + 'function': ctx.task.function, + 'operation_arguments': dict(arg.unwrapped for arg in ctx.task.arguments.values()), 'port': self._server_port, - 'context': task.context.serialization_dict, + 'context': ctx.serialization_dict, } def _construct_subprocess_env(self, task): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62fa985c/aria/orchestrator/workflows/executor/thread.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py index a44499e..8c447b6 100644 --- a/aria/orchestrator/workflows/executor/thread.py +++ b/aria/orchestrator/workflows/executor/thread.py @@ -46,8 +46,8 @@ class ThreadExecutor(BaseExecutor): thread.start() self._pool.append(thread) - def _execute(self, task): - self._queue.put(task) + def _execute(self, ctx): + self._queue.put(ctx) def close(self): self._stopped = True @@ -57,16 +57,15 @@ class ThreadExecutor(BaseExecutor): def _processor(self): while not self._stopped: try: - task = self._queue.get(timeout=1) - task = task.context.task - self._task_started(task) + ctx = self._queue.get(timeout=1) + self._task_started(ctx) try: - task_func = imports.load_attribute(task.function) - arguments = dict(arg.unwrapped for arg in task.arguments.values()) - task_func(ctx=task.context, **arguments) - self._task_succeeded(task) + task_func = imports.load_attribute(ctx.task.function) + arguments = dict(arg.unwrapped for arg in ctx.task.arguments.values()) + task_func(ctx=ctx, **arguments) + self._task_succeeded(ctx) except BaseException as e: - self._task_failed(task, + self._task_failed(ctx, exception=e, traceback=exceptions.get_exception_as_string(*sys.exc_info())) # Daemon threads http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62fa985c/tests/orchestrator/context/__init__.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/__init__.py b/tests/orchestrator/context/__init__.py index 4fde0a7..89a7b2c 100644 --- a/tests/orchestrator/context/__init__.py +++ b/tests/orchestrator/context/__init__.py @@ -23,7 +23,8 @@ def op_path(func, module_path=None): return '{0}.{1}'.format(module_path, func.__name__) -def execute(workflow_func, workflow_context, executor): +def execute(workflow_func, workflow_context, executor, executor_kwargs=None): graph = workflow_func(ctx=workflow_context) - eng = engine.Engine(executor=executor, workflow_context=workflow_context, tasks_graph=graph) + eng = engine.Engine(executor=executor, executor_kwargs=executor_kwargs, + workflow_context=workflow_context, tasks_graph=graph) eng.execute() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62fa985c/tests/orchestrator/context/test_operation.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_operation.py b/tests/orchestrator/context/test_operation.py index 3dcfaa2..1cb8f65 100644 --- a/tests/orchestrator/context/test_operation.py +++ b/tests/orchestrator/context/test_operation.py @@ -263,15 +263,10 @@ def test_plugin_workdir(ctx, thread_executor, tmpdir): @pytest.fixture(params=[ (thread.ThreadExecutor, {}), - (process.ProcessExecutor, {'python_path': [tests.ROOT_DIR]}), + # (process.ProcessExecutor, {'python_path': [tests.ROOT_DIR]}), ]) def executor(request): - executor_cls, executor_kwargs = request.param - result = executor_cls(**executor_kwargs) - try: - yield result - finally: - result.close() + return request.param def test_node_operation_logging(ctx, executor): @@ -305,12 +300,13 @@ def test_node_operation_logging(ctx, executor): ) ) - execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor) + execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor[0], + executor_kwargs=dict(ctx=ctx)) _assert_loggins(ctx, arguments) def test_relationship_operation_logging(ctx, executor): - interface_name, operation_name = mock.operations.RELATIONSHIP_OPERATIONS_INSTALL[0] + interface_name, operation_name = mock.operations.RELATIONHIP_OPERATIONS_INSTALL[0] relationship = ctx.model.relationship.list()[0] arguments = { http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62fa985c/tests/orchestrator/execution_plugin/test_local.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py index d792a57..8e8c6e0 100644 --- a/tests/orchestrator/execution_plugin/test_local.py +++ b/tests/orchestrator/execution_plugin/test_local.py @@ -508,9 +508,7 @@ if __name__ == '__main__': @pytest.fixture def executor(self): - result = process.ProcessExecutor() - yield result - result.close() + return process.ProcessExecutor @pytest.fixture def workflow_context(self, tmpdir): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62fa985c/tests/orchestrator/workflows/core/_test_engine.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/_test_engine.py b/tests/orchestrator/workflows/core/_test_engine.py deleted file mode 100644 index 7ffb92a..0000000 --- a/tests/orchestrator/workflows/core/_test_engine.py +++ /dev/null @@ -1,519 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import time -import threading -from datetime import datetime - -import pytest - -from aria.orchestrator import ( - events, - workflow, - operation, -) -from aria.modeling import models -from aria.orchestrator.workflows import ( - api, - exceptions, -) -from aria.orchestrator.workflows.core import engine -from aria.orchestrator.workflows.executor import thread - -from tests import mock, storage - - -global_test_holder = {} - - -class BaseTest(object): - - @classmethod - def _execute(cls, workflow_func, workflow_context, executor, executor_kwargs=None): - eng = cls._engine(workflow_func=workflow_func, - workflow_context=workflow_context, - executor=executor, - executor_kwargs=executor_kwargs) - eng.execute() - return eng - - @staticmethod - def _engine(workflow_func, workflow_context, executor, executor_kwargs=None): - graph = workflow_func(ctx=workflow_context) - return engine.Engine(executor=executor, - executor_kwargs=executor_kwargs, - workflow_context=workflow_context, - tasks_graph=graph) - - @staticmethod - def _op(ctx, - func, - arguments=None, - max_attempts=None, - retry_interval=None, - ignore_failure=None): - node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) - interface_name = 'aria.interfaces.lifecycle' - operation_kwargs = dict(function='{name}.{func.__name__}'.format( - name=__name__, func=func)) - if arguments: - # the operation has to declare the arguments before those may be passed - operation_kwargs['arguments'] = arguments - operation_name = 'create' - interface = mock.models.create_interface(node.service, interface_name, operation_name, - operation_kwargs=operation_kwargs) - node.interfaces[interface.name] = interface - - return api.task.OperationTask( - node, - interface_name='aria.interfaces.lifecycle', - operation_name=operation_name, - arguments=arguments, - max_attempts=max_attempts, - retry_interval=retry_interval, - ignore_failure=ignore_failure, - ) - - @pytest.fixture(autouse=True) - def globals_cleanup(self): - try: - yield - finally: - global_test_holder.clear() - - @pytest.fixture(autouse=True) - def signals_registration(self, ): - def sent_task_handler(task, *args, **kwargs): - if task.stub_type is None: - calls = global_test_holder.setdefault('sent_task_signal_calls', 0) - global_test_holder['sent_task_signal_calls'] = calls + 1 - - def start_workflow_handler(workflow_context, *args, **kwargs): - workflow_context.states.append('start') - - def success_workflow_handler(workflow_context, *args, **kwargs): - workflow_context.states.append('success') - - def failure_workflow_handler(workflow_context, exception, *args, **kwargs): - workflow_context.states.append('failure') - workflow_context.exception = exception - - def cancel_workflow_handler(workflow_context, *args, **kwargs): - workflow_context.states.append('cancel') - - events.start_workflow_signal.connect(start_workflow_handler) - events.on_success_workflow_signal.connect(success_workflow_handler) - events.on_failure_workflow_signal.connect(failure_workflow_handler) - events.on_cancelled_workflow_signal.connect(cancel_workflow_handler) - events.sent_task_signal.connect(sent_task_handler) - try: - yield - finally: - events.start_workflow_signal.disconnect(start_workflow_handler) - events.on_success_workflow_signal.disconnect(success_workflow_handler) - events.on_failure_workflow_signal.disconnect(failure_workflow_handler) - events.on_cancelled_workflow_signal.disconnect(cancel_workflow_handler) - events.sent_task_signal.disconnect(sent_task_handler) - - @pytest.fixture - def executor(self): - return thread.ThreadExecutor - - @pytest.fixture - def workflow_context(self, tmpdir): - workflow_context = mock.context.simple(str(tmpdir)) - workflow_context.states = [] - workflow_context.exception = None - yield workflow_context - storage.release_sqlite_storage(workflow_context.model) - - -class TestEngine(BaseTest): - - def test_empty_graph_execution(self, workflow_context, executor): - @workflow - def mock_workflow(**_): - pass - self._execute(workflow_func=mock_workflow, - workflow_context=workflow_context, - executor=executor) - assert workflow_context.states == ['start', 'success'] - assert workflow_context.exception is None - assert 'sent_task_signal_calls' not in global_test_holder - execution = workflow_context.execution - assert execution.started_at <= execution.ended_at <= datetime.utcnow() - assert execution.error is None - assert execution.status == models.Execution.SUCCEEDED - - def test_single_task_successful_execution(self, workflow_context, executor): - @workflow - def mock_workflow(ctx, graph): - graph.add_tasks(self._op(ctx, func=mock_success_task)) - self._execute( - workflow_func=mock_workflow, - workflow_context=workflow_context, - executor=executor) - assert workflow_context.states == ['start', 'success'] - assert workflow_context.exception is None - assert global_test_holder.get('sent_task_signal_calls') == 1 - - def test_single_task_failed_execution(self, workflow_context, executor): - @workflow - def mock_workflow(ctx, graph): - graph.add_tasks(self._op(ctx, func=mock_failed_task)) - with pytest.raises(exceptions.ExecutorException): - self._execute( - workflow_func=mock_workflow, - workflow_context=workflow_context, - executor=executor) - assert workflow_context.states == ['start', 'failure'] - assert isinstance(workflow_context.exception, exceptions.ExecutorException) - assert global_test_holder.get('sent_task_signal_calls') == 1 - execution = workflow_context.execution - assert execution.started_at <= execution.ended_at <= datetime.utcnow() - assert execution.error is not None - assert execution.status == models.Execution.FAILED - - def test_two_tasks_execution_order(self, workflow_context, executor): - @workflow - def mock_workflow(ctx, graph): - op1 = self._op(ctx, func=mock_ordered_task, arguments={'counter': 1}) - op2 = self._op(ctx, func=mock_ordered_task, arguments={'counter': 2}) - graph.sequence(op1, op2) - self._execute( - workflow_func=mock_workflow, - workflow_context=workflow_context, - executor=executor) - assert workflow_context.states == ['start', 'success'] - assert workflow_context.exception is None - assert global_test_holder.get('invocations') == [1, 2] - assert global_test_holder.get('sent_task_signal_calls') == 2 - - def test_stub_and_subworkflow_execution(self, workflow_context, executor): - @workflow - def sub_workflow(ctx, graph): - op1 = self._op(ctx, func=mock_ordered_task, arguments={'counter': 1}) - op2 = api.task.StubTask() - op3 = self._op(ctx, func=mock_ordered_task, arguments={'counter': 2}) - graph.sequence(op1, op2, op3) - - @workflow - def mock_workflow(ctx, graph): - graph.add_tasks(api.task.WorkflowTask(sub_workflow, ctx=ctx)) - self._execute(workflow_func=mock_workflow, - workflow_context=workflow_context, - executor=executor) - assert workflow_context.states == ['start', 'success'] - assert workflow_context.exception is None - assert global_test_holder.get('invocations') == [1, 2] - assert global_test_holder.get('sent_task_signal_calls') == 2 - - -class TestCancel(BaseTest): - - def test_cancel_started_execution(self, workflow_context, executor): - number_of_tasks = 100 - - @workflow - def mock_workflow(ctx, graph): - operations = ( - self._op(ctx, func=mock_sleep_task, arguments=dict(seconds=0.1)) - for _ in range(number_of_tasks) - ) - return graph.sequence(*operations) - - eng = self._engine(workflow_func=mock_workflow, - workflow_context=workflow_context, - executor=executor) - t = threading.Thread(target=eng.execute) - t.start() - time.sleep(10) - eng.cancel_execution() - t.join(timeout=60) # we need to give this a *lot* of time because Travis can be *very* slow - assert not t.is_alive() # if join is timed out it will not raise an exception - assert workflow_context.states == ['start', 'cancel'] - assert workflow_context.exception is None - invocations = global_test_holder.get('invocations', []) - assert 0 < len(invocations) < number_of_tasks - execution = workflow_context.execution - assert execution.started_at <= execution.ended_at <= datetime.utcnow() - assert execution.error is None - assert execution.status == models.Execution.CANCELLED - - def test_cancel_pending_execution(self, workflow_context, executor): - @workflow - def mock_workflow(graph, **_): - return graph - eng = self._engine(workflow_func=mock_workflow, - workflow_context=workflow_context, - executor=executor) - eng.cancel_execution() - execution = workflow_context.execution - assert execution.status == models.Execution.CANCELLED - - -class TestRetries(BaseTest): - - def test_two_max_attempts_and_success_on_retry(self, workflow_context, executor): - @workflow - def mock_workflow(ctx, graph): - op = self._op(ctx, func=mock_conditional_failure_task, - arguments={'failure_count': 1}, - max_attempts=2) - graph.add_tasks(op) - self._execute( - workflow_func=mock_workflow, - workflow_context=workflow_context, - executor=executor) - assert workflow_context.states == ['start', 'success'] - assert workflow_context.exception is None - assert len(global_test_holder.get('invocations', [])) == 2 - assert global_test_holder.get('sent_task_signal_calls') == 2 - - def test_two_max_attempts_and_failure_on_retry(self, workflow_context, executor): - @workflow - def mock_workflow(ctx, graph): - op = self._op(ctx, func=mock_conditional_failure_task, - arguments={'failure_count': 2}, - max_attempts=2) - graph.add_tasks(op) - with pytest.raises(exceptions.ExecutorException): - self._execute( - workflow_func=mock_workflow, - workflow_context=workflow_context, - executor=executor) - assert workflow_context.states == ['start', 'failure'] - assert isinstance(workflow_context.exception, exceptions.ExecutorException) - assert len(global_test_holder.get('invocations', [])) == 2 - assert global_test_holder.get('sent_task_signal_calls') == 2 - - def test_three_max_attempts_and_success_on_first_retry(self, workflow_context, executor): - @workflow - def mock_workflow(ctx, graph): - op = self._op(ctx, func=mock_conditional_failure_task, - arguments={'failure_count': 1}, - max_attempts=3) - graph.add_tasks(op) - self._execute( - workflow_func=mock_workflow, - workflow_context=workflow_context, - executor=executor) - assert workflow_context.states == ['start', 'success'] - assert workflow_context.exception is None - assert len(global_test_holder.get('invocations', [])) == 2 - assert global_test_holder.get('sent_task_signal_calls') == 2 - - def test_three_max_attempts_and_success_on_second_retry(self, workflow_context, executor): - @workflow - def mock_workflow(ctx, graph): - op = self._op(ctx, func=mock_conditional_failure_task, - arguments={'failure_count': 2}, - max_attempts=3) - graph.add_tasks(op) - self._execute( - workflow_func=mock_workflow, - workflow_context=workflow_context, - executor=executor) - assert workflow_context.states == ['start', 'success'] - assert workflow_context.exception is None - assert len(global_test_holder.get('invocations', [])) == 3 - assert global_test_holder.get('sent_task_signal_calls') == 3 - - def test_infinite_retries(self, workflow_context, executor): - @workflow - def mock_workflow(ctx, graph): - op = self._op(ctx, func=mock_conditional_failure_task, - arguments={'failure_count': 1}, - max_attempts=-1) - graph.add_tasks(op) - self._execute( - workflow_func=mock_workflow, - workflow_context=workflow_context, - executor=executor) - assert workflow_context.states == ['start', 'success'] - assert workflow_context.exception is None - assert len(global_test_holder.get('invocations', [])) == 2 - assert global_test_holder.get('sent_task_signal_calls') == 2 - - def test_retry_interval_float(self, workflow_context, executor): - self._test_retry_interval(retry_interval=0.3, - workflow_context=workflow_context, - executor=executor) - - def test_retry_interval_int(self, workflow_context, executor): - self._test_retry_interval(retry_interval=1, - workflow_context=workflow_context, - executor=executor) - - def _test_retry_interval(self, retry_interval, workflow_context, executor): - @workflow - def mock_workflow(ctx, graph): - op = self._op(ctx, func=mock_conditional_failure_task, - arguments={'failure_count': 1}, - max_attempts=2, - retry_interval=retry_interval) - graph.add_tasks(op) - self._execute( - workflow_func=mock_workflow, - workflow_context=workflow_context, - executor=executor) - assert workflow_context.states == ['start', 'success'] - assert workflow_context.exception is None - invocations = global_test_holder.get('invocations', []) - assert len(invocations) == 2 - invocation1, invocation2 = invocations - assert invocation2 - invocation1 >= retry_interval - assert global_test_holder.get('sent_task_signal_calls') == 2 - - def test_ignore_failure(self, workflow_context, executor): - @workflow - def mock_workflow(ctx, graph): - op = self._op(ctx, func=mock_conditional_failure_task, - ignore_failure=True, - arguments={'failure_count': 100}, - max_attempts=100) - graph.add_tasks(op) - self._execute( - workflow_func=mock_workflow, - workflow_context=workflow_context, - executor=executor) - assert workflow_context.states == ['start', 'success'] - assert workflow_context.exception is None - invocations = global_test_holder.get('invocations', []) - assert len(invocations) == 1 - assert global_test_holder.get('sent_task_signal_calls') == 1 - - -class TestTaskRetryAndAbort(BaseTest): - message = 'EXPECTED_ERROR' - - def test_task_retry_default_interval(self, workflow_context, executor): - default_retry_interval = 0.1 - - @workflow - def mock_workflow(ctx, graph): - op = self._op(ctx, func=mock_task_retry, - arguments={'message': self.message}, - retry_interval=default_retry_interval, - max_attempts=2) - graph.add_tasks(op) - with pytest.raises(exceptions.ExecutorException): - self._execute( - workflow_func=mock_workflow, - workflow_context=workflow_context, - executor=executor) - assert workflow_context.states == ['start', 'failure'] - assert isinstance(workflow_context.exception, exceptions.ExecutorException) - invocations = global_test_holder.get('invocations', []) - assert len(invocations) == 2 - invocation1, invocation2 = invocations - assert invocation2 - invocation1 >= default_retry_interval - assert global_test_holder.get('sent_task_signal_calls') == 2 - - def test_task_retry_custom_interval(self, workflow_context, executor): - default_retry_interval = 100 - custom_retry_interval = 0.1 - - @workflow - def mock_workflow(ctx, graph): - op = self._op(ctx, func=mock_task_retry, - arguments={'message': self.message, - 'retry_interval': custom_retry_interval}, - retry_interval=default_retry_interval, - max_attempts=2) - graph.add_tasks(op) - execution_start = time.time() - with pytest.raises(exceptions.ExecutorException): - self._execute( - workflow_func=mock_workflow, - workflow_context=workflow_context, - executor=executor) - execution_end = time.time() - assert workflow_context.states == ['start', 'failure'] - assert isinstance(workflow_context.exception, exceptions.ExecutorException) - invocations = global_test_holder.get('invocations', []) - assert len(invocations) == 2 - assert (execution_end - execution_start) < default_retry_interval - assert global_test_holder.get('sent_task_signal_calls') == 2 - - def test_task_abort(self, workflow_context, executor): - @workflow - def mock_workflow(ctx, graph): - op = self._op(ctx, func=mock_task_abort, - arguments={'message': self.message}, - retry_interval=100, - max_attempts=100) - graph.add_tasks(op) - with pytest.raises(exceptions.ExecutorException): - self._execute( - workflow_func=mock_workflow, - workflow_context=workflow_context, - executor=executor) - assert workflow_context.states == ['start', 'failure'] - assert isinstance(workflow_context.exception, exceptions.ExecutorException) - invocations = global_test_holder.get('invocations', []) - assert len(invocations) == 1 - assert global_test_holder.get('sent_task_signal_calls') == 1 - - -@operation -def mock_success_task(**_): - pass - - -@operation -def mock_failed_task(**_): - raise RuntimeError - - -@operation -def mock_ordered_task(counter, **_): - invocations = global_test_holder.setdefault('invocations', []) - invocations.append(counter) - - -@operation -def mock_conditional_failure_task(failure_count, **_): - invocations = global_test_holder.setdefault('invocations', []) - try: - if len(invocations) < failure_count: - raise RuntimeError - finally: - invocations.append(time.time()) - - -@operation -def mock_sleep_task(seconds, **_): - _add_invocation_timestamp() - time.sleep(seconds) - - -@operation -def mock_task_retry(ctx, message, retry_interval=None, **_): - _add_invocation_timestamp() - retry_kwargs = {} - if retry_interval is not None: - retry_kwargs['retry_interval'] = retry_interval - ctx.task.retry(message, **retry_kwargs) - - -@operation -def mock_task_abort(ctx, message, **_): - _add_invocation_timestamp() - ctx.task.abort(message) - - -def _add_invocation_timestamp(): - invocations = global_test_holder.setdefault('invocations', []) - invocations.append(time.time()) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62fa985c/tests/orchestrator/workflows/core/test_engine.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py new file mode 100644 index 0000000..7ffb92a --- /dev/null +++ b/tests/orchestrator/workflows/core/test_engine.py @@ -0,0 +1,519 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import time +import threading +from datetime import datetime + +import pytest + +from aria.orchestrator import ( + events, + workflow, + operation, +) +from aria.modeling import models +from aria.orchestrator.workflows import ( + api, + exceptions, +) +from aria.orchestrator.workflows.core import engine +from aria.orchestrator.workflows.executor import thread + +from tests import mock, storage + + +global_test_holder = {} + + +class BaseTest(object): + + @classmethod + def _execute(cls, workflow_func, workflow_context, executor, executor_kwargs=None): + eng = cls._engine(workflow_func=workflow_func, + workflow_context=workflow_context, + executor=executor, + executor_kwargs=executor_kwargs) + eng.execute() + return eng + + @staticmethod + def _engine(workflow_func, workflow_context, executor, executor_kwargs=None): + graph = workflow_func(ctx=workflow_context) + return engine.Engine(executor=executor, + executor_kwargs=executor_kwargs, + workflow_context=workflow_context, + tasks_graph=graph) + + @staticmethod + def _op(ctx, + func, + arguments=None, + max_attempts=None, + retry_interval=None, + ignore_failure=None): + node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + interface_name = 'aria.interfaces.lifecycle' + operation_kwargs = dict(function='{name}.{func.__name__}'.format( + name=__name__, func=func)) + if arguments: + # the operation has to declare the arguments before those may be passed + operation_kwargs['arguments'] = arguments + operation_name = 'create' + interface = mock.models.create_interface(node.service, interface_name, operation_name, + operation_kwargs=operation_kwargs) + node.interfaces[interface.name] = interface + + return api.task.OperationTask( + node, + interface_name='aria.interfaces.lifecycle', + operation_name=operation_name, + arguments=arguments, + max_attempts=max_attempts, + retry_interval=retry_interval, + ignore_failure=ignore_failure, + ) + + @pytest.fixture(autouse=True) + def globals_cleanup(self): + try: + yield + finally: + global_test_holder.clear() + + @pytest.fixture(autouse=True) + def signals_registration(self, ): + def sent_task_handler(task, *args, **kwargs): + if task.stub_type is None: + calls = global_test_holder.setdefault('sent_task_signal_calls', 0) + global_test_holder['sent_task_signal_calls'] = calls + 1 + + def start_workflow_handler(workflow_context, *args, **kwargs): + workflow_context.states.append('start') + + def success_workflow_handler(workflow_context, *args, **kwargs): + workflow_context.states.append('success') + + def failure_workflow_handler(workflow_context, exception, *args, **kwargs): + workflow_context.states.append('failure') + workflow_context.exception = exception + + def cancel_workflow_handler(workflow_context, *args, **kwargs): + workflow_context.states.append('cancel') + + events.start_workflow_signal.connect(start_workflow_handler) + events.on_success_workflow_signal.connect(success_workflow_handler) + events.on_failure_workflow_signal.connect(failure_workflow_handler) + events.on_cancelled_workflow_signal.connect(cancel_workflow_handler) + events.sent_task_signal.connect(sent_task_handler) + try: + yield + finally: + events.start_workflow_signal.disconnect(start_workflow_handler) + events.on_success_workflow_signal.disconnect(success_workflow_handler) + events.on_failure_workflow_signal.disconnect(failure_workflow_handler) + events.on_cancelled_workflow_signal.disconnect(cancel_workflow_handler) + events.sent_task_signal.disconnect(sent_task_handler) + + @pytest.fixture + def executor(self): + return thread.ThreadExecutor + + @pytest.fixture + def workflow_context(self, tmpdir): + workflow_context = mock.context.simple(str(tmpdir)) + workflow_context.states = [] + workflow_context.exception = None + yield workflow_context + storage.release_sqlite_storage(workflow_context.model) + + +class TestEngine(BaseTest): + + def test_empty_graph_execution(self, workflow_context, executor): + @workflow + def mock_workflow(**_): + pass + self._execute(workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + assert 'sent_task_signal_calls' not in global_test_holder + execution = workflow_context.execution + assert execution.started_at <= execution.ended_at <= datetime.utcnow() + assert execution.error is None + assert execution.status == models.Execution.SUCCEEDED + + def test_single_task_successful_execution(self, workflow_context, executor): + @workflow + def mock_workflow(ctx, graph): + graph.add_tasks(self._op(ctx, func=mock_success_task)) + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + assert global_test_holder.get('sent_task_signal_calls') == 1 + + def test_single_task_failed_execution(self, workflow_context, executor): + @workflow + def mock_workflow(ctx, graph): + graph.add_tasks(self._op(ctx, func=mock_failed_task)) + with pytest.raises(exceptions.ExecutorException): + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'failure'] + assert isinstance(workflow_context.exception, exceptions.ExecutorException) + assert global_test_holder.get('sent_task_signal_calls') == 1 + execution = workflow_context.execution + assert execution.started_at <= execution.ended_at <= datetime.utcnow() + assert execution.error is not None + assert execution.status == models.Execution.FAILED + + def test_two_tasks_execution_order(self, workflow_context, executor): + @workflow + def mock_workflow(ctx, graph): + op1 = self._op(ctx, func=mock_ordered_task, arguments={'counter': 1}) + op2 = self._op(ctx, func=mock_ordered_task, arguments={'counter': 2}) + graph.sequence(op1, op2) + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + assert global_test_holder.get('invocations') == [1, 2] + assert global_test_holder.get('sent_task_signal_calls') == 2 + + def test_stub_and_subworkflow_execution(self, workflow_context, executor): + @workflow + def sub_workflow(ctx, graph): + op1 = self._op(ctx, func=mock_ordered_task, arguments={'counter': 1}) + op2 = api.task.StubTask() + op3 = self._op(ctx, func=mock_ordered_task, arguments={'counter': 2}) + graph.sequence(op1, op2, op3) + + @workflow + def mock_workflow(ctx, graph): + graph.add_tasks(api.task.WorkflowTask(sub_workflow, ctx=ctx)) + self._execute(workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + assert global_test_holder.get('invocations') == [1, 2] + assert global_test_holder.get('sent_task_signal_calls') == 2 + + +class TestCancel(BaseTest): + + def test_cancel_started_execution(self, workflow_context, executor): + number_of_tasks = 100 + + @workflow + def mock_workflow(ctx, graph): + operations = ( + self._op(ctx, func=mock_sleep_task, arguments=dict(seconds=0.1)) + for _ in range(number_of_tasks) + ) + return graph.sequence(*operations) + + eng = self._engine(workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + t = threading.Thread(target=eng.execute) + t.start() + time.sleep(10) + eng.cancel_execution() + t.join(timeout=60) # we need to give this a *lot* of time because Travis can be *very* slow + assert not t.is_alive() # if join is timed out it will not raise an exception + assert workflow_context.states == ['start', 'cancel'] + assert workflow_context.exception is None + invocations = global_test_holder.get('invocations', []) + assert 0 < len(invocations) < number_of_tasks + execution = workflow_context.execution + assert execution.started_at <= execution.ended_at <= datetime.utcnow() + assert execution.error is None + assert execution.status == models.Execution.CANCELLED + + def test_cancel_pending_execution(self, workflow_context, executor): + @workflow + def mock_workflow(graph, **_): + return graph + eng = self._engine(workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + eng.cancel_execution() + execution = workflow_context.execution + assert execution.status == models.Execution.CANCELLED + + +class TestRetries(BaseTest): + + def test_two_max_attempts_and_success_on_retry(self, workflow_context, executor): + @workflow + def mock_workflow(ctx, graph): + op = self._op(ctx, func=mock_conditional_failure_task, + arguments={'failure_count': 1}, + max_attempts=2) + graph.add_tasks(op) + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + assert len(global_test_holder.get('invocations', [])) == 2 + assert global_test_holder.get('sent_task_signal_calls') == 2 + + def test_two_max_attempts_and_failure_on_retry(self, workflow_context, executor): + @workflow + def mock_workflow(ctx, graph): + op = self._op(ctx, func=mock_conditional_failure_task, + arguments={'failure_count': 2}, + max_attempts=2) + graph.add_tasks(op) + with pytest.raises(exceptions.ExecutorException): + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'failure'] + assert isinstance(workflow_context.exception, exceptions.ExecutorException) + assert len(global_test_holder.get('invocations', [])) == 2 + assert global_test_holder.get('sent_task_signal_calls') == 2 + + def test_three_max_attempts_and_success_on_first_retry(self, workflow_context, executor): + @workflow + def mock_workflow(ctx, graph): + op = self._op(ctx, func=mock_conditional_failure_task, + arguments={'failure_count': 1}, + max_attempts=3) + graph.add_tasks(op) + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + assert len(global_test_holder.get('invocations', [])) == 2 + assert global_test_holder.get('sent_task_signal_calls') == 2 + + def test_three_max_attempts_and_success_on_second_retry(self, workflow_context, executor): + @workflow + def mock_workflow(ctx, graph): + op = self._op(ctx, func=mock_conditional_failure_task, + arguments={'failure_count': 2}, + max_attempts=3) + graph.add_tasks(op) + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + assert len(global_test_holder.get('invocations', [])) == 3 + assert global_test_holder.get('sent_task_signal_calls') == 3 + + def test_infinite_retries(self, workflow_context, executor): + @workflow + def mock_workflow(ctx, graph): + op = self._op(ctx, func=mock_conditional_failure_task, + arguments={'failure_count': 1}, + max_attempts=-1) + graph.add_tasks(op) + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + assert len(global_test_holder.get('invocations', [])) == 2 + assert global_test_holder.get('sent_task_signal_calls') == 2 + + def test_retry_interval_float(self, workflow_context, executor): + self._test_retry_interval(retry_interval=0.3, + workflow_context=workflow_context, + executor=executor) + + def test_retry_interval_int(self, workflow_context, executor): + self._test_retry_interval(retry_interval=1, + workflow_context=workflow_context, + executor=executor) + + def _test_retry_interval(self, retry_interval, workflow_context, executor): + @workflow + def mock_workflow(ctx, graph): + op = self._op(ctx, func=mock_conditional_failure_task, + arguments={'failure_count': 1}, + max_attempts=2, + retry_interval=retry_interval) + graph.add_tasks(op) + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + invocations = global_test_holder.get('invocations', []) + assert len(invocations) == 2 + invocation1, invocation2 = invocations + assert invocation2 - invocation1 >= retry_interval + assert global_test_holder.get('sent_task_signal_calls') == 2 + + def test_ignore_failure(self, workflow_context, executor): + @workflow + def mock_workflow(ctx, graph): + op = self._op(ctx, func=mock_conditional_failure_task, + ignore_failure=True, + arguments={'failure_count': 100}, + max_attempts=100) + graph.add_tasks(op) + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'success'] + assert workflow_context.exception is None + invocations = global_test_holder.get('invocations', []) + assert len(invocations) == 1 + assert global_test_holder.get('sent_task_signal_calls') == 1 + + +class TestTaskRetryAndAbort(BaseTest): + message = 'EXPECTED_ERROR' + + def test_task_retry_default_interval(self, workflow_context, executor): + default_retry_interval = 0.1 + + @workflow + def mock_workflow(ctx, graph): + op = self._op(ctx, func=mock_task_retry, + arguments={'message': self.message}, + retry_interval=default_retry_interval, + max_attempts=2) + graph.add_tasks(op) + with pytest.raises(exceptions.ExecutorException): + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'failure'] + assert isinstance(workflow_context.exception, exceptions.ExecutorException) + invocations = global_test_holder.get('invocations', []) + assert len(invocations) == 2 + invocation1, invocation2 = invocations + assert invocation2 - invocation1 >= default_retry_interval + assert global_test_holder.get('sent_task_signal_calls') == 2 + + def test_task_retry_custom_interval(self, workflow_context, executor): + default_retry_interval = 100 + custom_retry_interval = 0.1 + + @workflow + def mock_workflow(ctx, graph): + op = self._op(ctx, func=mock_task_retry, + arguments={'message': self.message, + 'retry_interval': custom_retry_interval}, + retry_interval=default_retry_interval, + max_attempts=2) + graph.add_tasks(op) + execution_start = time.time() + with pytest.raises(exceptions.ExecutorException): + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + execution_end = time.time() + assert workflow_context.states == ['start', 'failure'] + assert isinstance(workflow_context.exception, exceptions.ExecutorException) + invocations = global_test_holder.get('invocations', []) + assert len(invocations) == 2 + assert (execution_end - execution_start) < default_retry_interval + assert global_test_holder.get('sent_task_signal_calls') == 2 + + def test_task_abort(self, workflow_context, executor): + @workflow + def mock_workflow(ctx, graph): + op = self._op(ctx, func=mock_task_abort, + arguments={'message': self.message}, + retry_interval=100, + max_attempts=100) + graph.add_tasks(op) + with pytest.raises(exceptions.ExecutorException): + self._execute( + workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + assert workflow_context.states == ['start', 'failure'] + assert isinstance(workflow_context.exception, exceptions.ExecutorException) + invocations = global_test_holder.get('invocations', []) + assert len(invocations) == 1 + assert global_test_holder.get('sent_task_signal_calls') == 1 + + +@operation +def mock_success_task(**_): + pass + + +@operation +def mock_failed_task(**_): + raise RuntimeError + + +@operation +def mock_ordered_task(counter, **_): + invocations = global_test_holder.setdefault('invocations', []) + invocations.append(counter) + + +@operation +def mock_conditional_failure_task(failure_count, **_): + invocations = global_test_holder.setdefault('invocations', []) + try: + if len(invocations) < failure_count: + raise RuntimeError + finally: + invocations.append(time.time()) + + +@operation +def mock_sleep_task(seconds, **_): + _add_invocation_timestamp() + time.sleep(seconds) + + +@operation +def mock_task_retry(ctx, message, retry_interval=None, **_): + _add_invocation_timestamp() + retry_kwargs = {} + if retry_interval is not None: + retry_kwargs['retry_interval'] = retry_interval + ctx.task.retry(message, **retry_kwargs) + + +@operation +def mock_task_abort(ctx, message, **_): + _add_invocation_timestamp() + ctx.task.abort(message) + + +def _add_invocation_timestamp(): + invocations = global_test_holder.setdefault('invocations', []) + invocations.append(time.time()) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62fa985c/tests/orchestrator/workflows/core/test_events.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_events.py b/tests/orchestrator/workflows/core/test_events.py index 6d542e9..d63a8ef 100644 --- a/tests/orchestrator/workflows/core/test_events.py +++ b/tests/orchestrator/workflows/core/test_events.py @@ -113,7 +113,7 @@ def run_operation_on_node(ctx, op_name, interface_name): operation_kwargs=dict(function='{name}.{func.__name__}'.format(name=__name__, func=func))) node.interfaces[interface.name] = interface - eng = engine.Engine(executor=ThreadExecutor(), + eng = engine.Engine(executor=ThreadExecutor, workflow_context=ctx, tasks_graph=single_operation_workflow(ctx=ctx, # pylint: disable=no-value-for-parameter node=node, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62fa985c/tests/orchestrator/workflows/executor/__init__.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/__init__.py b/tests/orchestrator/workflows/executor/__init__.py index ac6d325..07fb8ad 100644 --- a/tests/orchestrator/workflows/executor/__init__.py +++ b/tests/orchestrator/workflows/executor/__init__.py @@ -34,7 +34,7 @@ class MockTask(object): self.exception = None self.id = str(uuid.uuid4()) self.logger = logging.getLogger() - self.context = MockContext(storage) + self.context = MockContext(self.id, storage) self.attempts_count = 1 self.max_attempts = 1 self.ignore_failure = False @@ -46,34 +46,32 @@ class MockTask(object): for state in models.Task.STATES: setattr(self, state.upper(), state) - @contextmanager - def _update(self): - yield self - class MockContext(object): - def __init__(self, storage=None): + def __init__(self, task_id, storage=None): self.logger = logging.getLogger('mock_logger') - self.task = type('SubprocessMockTask', (object, ), {'plugin': None}) + self.task_id = task_id self.model = storage @property def serialization_dict(self): if self.model: - return {'context': self.model.serialization_dict, 'context_cls': self.__class__} + context = self.model.serialization_dict + context['task_id'] = self.task_id + return {'context': context, 'context_cls': self.__class__} else: - return {'context_cls': self.__class__, 'context': {}} + return {'context_cls': self.__class__, 'context': {'task': self.task_id}} def __getattr__(self, item): return None @classmethod - def instantiate_from_dict(cls, **kwargs): + def instantiate_from_dict(cls, task_id, **kwargs): if kwargs: - return cls(storage=aria.application_model_storage(**kwargs)) + return cls(task_id=task_id, storage=aria.application_model_storage(**kwargs)) else: - return cls() + return cls(task=task_id) @staticmethod def close(): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62fa985c/tests/orchestrator/workflows/executor/test_executor.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_executor.py b/tests/orchestrator/workflows/executor/test_executor.py index 3079c60..cfb6975 100644 --- a/tests/orchestrator/workflows/executor/test_executor.py +++ b/tests/orchestrator/workflows/executor/test_executor.py @@ -103,7 +103,7 @@ def storage(tmpdir): @pytest.fixture(params=[ (thread.ThreadExecutor, {'pool_size': 1}), - (thread.ThreadExecutor, {'pool_size': 2}), + # (thread.ThreadExecutor, {'pool_size': 2}), # subprocess needs to load a tests module so we explicitly add the root directory as if # the project has been installed in editable mode # (celery.CeleryExecutor, {'app': app})
