Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-236-Resumable-workflow-executions 507796e69 -> 9431465dd
wip Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/9431465d Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/9431465d Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/9431465d Branch: refs/heads/ARIA-236-Resumable-workflow-executions Commit: 9431465ddcf59de37f9c984e8b600bc9ab81b82f Parents: 507796e Author: max-orlov <[email protected]> Authored: Mon Jun 19 17:44:45 2017 +0300 Committer: max-orlov <[email protected]> Committed: Mon Jun 19 17:44:45 2017 +0300 ---------------------------------------------------------------------- aria/modeling/orchestration.py | 3 +- aria/orchestrator/context/workflow.py | 5 + aria/orchestrator/events.py | 1 + aria/orchestrator/workflow_runner.py | 38 +++-- aria/orchestrator/workflows/core/engine.py | 4 + .../workflows/core/events_handler.py | 7 + tests/mock/models.py | 14 +- tests/orchestrator/test_workflow_runner.py | 149 ++++++++++++++++++- 8 files changed, 200 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9431465d/aria/modeling/orchestration.py ---------------------------------------------------------------------- diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py index 007eefa..7751eb0 100644 --- a/aria/modeling/orchestration.py +++ b/aria/modeling/orchestration.py @@ -68,7 +68,8 @@ class ExecutionBase(mixins.ModelMixin): VALID_TRANSITIONS = { PENDING: (STARTED, CANCELLED), STARTED: END_STATES + (CANCELLING,), - CANCELLING: END_STATES + CANCELLING: END_STATES, + CANCELLED: PENDING } @orm.validates('status') http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9431465d/aria/orchestrator/context/workflow.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/workflow.py b/aria/orchestrator/context/workflow.py index 4b7573f..97db8a9 100644 --- a/aria/orchestrator/context/workflow.py +++ b/aria/orchestrator/context/workflow.py @@ -97,10 +97,15 @@ class WorkflowContext(BaseContext): @property def _graph(self): + # Constructing a graph with only not ended nodes if self._execution_graph is None: graph = DiGraph() for task in self.execution.tasks: + if task.has_ended(): + continue for dependency in task.dependencies: + if dependency.has_ended(): + continue graph.add_edge(dependency, task) self._execution_graph = graph http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9431465d/aria/orchestrator/events.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/events.py b/aria/orchestrator/events.py index a1c4922..a1d7006 100644 --- a/aria/orchestrator/events.py +++ b/aria/orchestrator/events.py @@ -34,3 +34,4 @@ on_cancelling_workflow_signal = signal('on_cancelling_workflow_signal') on_cancelled_workflow_signal = signal('on_cancelled_workflow_signal') on_success_workflow_signal = signal('on_success_workflow_signal') on_failure_workflow_signal = signal('on_failure_workflow_signal') +on_resume_workflow_signal = signal('on_resume_workflow_signal') \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9431465d/aria/orchestrator/workflow_runner.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py index c30ec4b..995d325 100644 --- a/aria/orchestrator/workflow_runner.py +++ b/aria/orchestrator/workflow_runner.py @@ -37,9 +37,9 @@ DEFAULT_TASK_RETRY_INTERVAL = 30 class WorkflowRunner(object): - def __init__(self, workflow_name, service_id, inputs, - model_storage, resource_storage, plugin_manager, - executor=None, task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS, + def __init__(self, service_id, inputs, model_storage, resource_storage, plugin_manager, + execution_id=None, workflow_name=None, executor=None, + task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS, task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL): """ Manages a single workflow execution on a given service. @@ -57,25 +57,30 @@ class WorkflowRunner(object): self._model_storage = model_storage self._resource_storage = resource_storage - self._workflow_name = workflow_name # the IDs are stored rather than the models themselves, so this module could be used # by several threads without raising errors on model objects shared between threads self._service_id = service_id - self._validate_workflow_exists_for_service() - - workflow_fn = self._get_workflow_fn() - - execution = self._create_execution_model(inputs) - self._execution_id = execution.id + if workflow_name is not None: + assert execution_id is None + self._workflow_name = workflow_name + execution = self._create_execution_model(inputs) + self._execution_id = execution.id + self._validate_workflow_exists_for_service() + workflow_fn = self._get_workflow_fn() + else: + assert execution_id is not None + self._execution_id = execution_id + self._workflow_name = \ + self._model_storage.execution.get(self._execution_id).workflow_name self._workflow_context = WorkflowContext( name=self.__class__.__name__, model_storage=self._model_storage, resource_storage=resource_storage, service_id=service_id, - execution_id=execution.id, + execution_id=self._execution_id, workflow_name=workflow_name, task_max_attempts=task_max_attempts, task_retry_interval=task_retry_interval) @@ -86,14 +91,19 @@ class WorkflowRunner(object): # transforming the execution inputs to dict, to pass them to the workflow function execution_inputs_dict = dict(inp.unwrapped for inp in self.execution.inputs.values()) - self._tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict) - engine.construct_execution_tasks(self.execution, self._tasks_graph, executor.__class__) + if execution_id is None: + # Not an existing execution + tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict) + engine.construct_execution_tasks(self.execution, tasks_graph, executor.__class__) # Update the state - self._model_storage.execution.update(execution) + self._model_storage.execution.update(self.execution) self._engine = engine.Engine(default_executor=executor) + if workflow_name is None: + self._engine.resume_execution(self._workflow_context) + @property def execution_id(self): return self._execution_id http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9431465d/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py index cec561f..a403e35 100644 --- a/aria/orchestrator/workflows/core/engine.py +++ b/aria/orchestrator/workflows/core/engine.py @@ -67,6 +67,10 @@ class Engine(logger.LoggerMixin): raise @staticmethod + def resume_execution(ctx): + events.on_resume_workflow_signal.send(ctx) + + @staticmethod def cancel_execution(ctx): """ Send a cancel request to the engine. If execution already started, execution status http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9431465d/aria/orchestrator/workflows/core/events_handler.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py index 3a780d5..64baf79 100644 --- a/aria/orchestrator/workflows/core/events_handler.py +++ b/aria/orchestrator/workflows/core/events_handler.py @@ -121,6 +121,13 @@ def _workflow_cancelled(workflow_context, *args, **kwargs): execution.ended_at = datetime.utcnow() [email protected]_resume_workflow_signal.connect +def _workflow_resume(workflow_context, *args, **kwargs): + with workflow_context.track_changes: + execution = workflow_context.execution + execution.status = execution.PENDING + + @events.on_cancelling_workflow_signal.connect def _workflow_cancelling(workflow_context, *args, **kwargs): with workflow_context.track_changes: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9431465d/tests/mock/models.py ---------------------------------------------------------------------- diff --git a/tests/mock/models.py b/tests/mock/models.py index 7f6bbea..23a14bd 100644 --- a/tests/mock/models.py +++ b/tests/mock/models.py @@ -225,20 +225,24 @@ def create_interface_template(service_template, interface_name, operation_name, ) -def create_interface(service, interface_name, operation_name, operation_kwargs=None, - interface_kwargs=None): - the_type = service.service_template.interface_types.get_descendant('test_interface_type') - +def create_operation(operation_name, operation_kwargs=None): if operation_kwargs and operation_kwargs.get('arguments'): operation_kwargs['arguments'] = dict( (argument_name, models.Argument.wrap(argument_name, argument_value)) for argument_name, argument_value in operation_kwargs['arguments'].iteritems() if argument_value is not None) - operation = models.Operation( + return models.Operation( name=operation_name, **(operation_kwargs or {}) ) + + +def create_interface(service, interface_name, operation_name, operation_kwargs=None, + interface_kwargs=None): + the_type = service.service_template.interface_types.get_descendant('test_interface_type') + operation = create_operation(operation_name, operation_kwargs) + return models.Interface( type=the_type, operations=_dictify(operation), http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9431465d/tests/orchestrator/test_workflow_runner.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py index c2312b1..df4638a 100644 --- a/tests/orchestrator/test_workflow_runner.py +++ b/tests/orchestrator/test_workflow_runner.py @@ -14,21 +14,32 @@ # limitations under the License. import json +from threading import Thread from datetime import datetime -import pytest import mock +import pytest from aria.modeling import exceptions as modeling_exceptions from aria.modeling import models from aria.orchestrator import exceptions from aria.orchestrator.workflow_runner import WorkflowRunner from aria.orchestrator.workflows.executor.process import ProcessExecutor +from aria.orchestrator.workflows import api +from aria.orchestrator.workflows.core import engine +from aria.orchestrator.workflows.executor import thread +from aria.orchestrator import ( + workflow, + operation, +) from ..mock import ( topology, workflow as workflow_mocks ) + +from tests import mock, storage + from ..fixtures import ( # pylint: disable=unused-import plugins_dir, plugin_manager, @@ -36,6 +47,8 @@ from ..fixtures import ( # pylint: disable=unused-import resource_storage as resource ) +global_test_holder = {} + def test_undeclared_workflow(request): # validating a proper error is raised when the workflow is not declared in the service @@ -293,3 +306,137 @@ def _create_workflow_runner(request, workflow_name, inputs=None, executor=None, resource_storage=resource, plugin_manager=plugin_manager, **task_configuration_kwargs) + + +class TestResumableWorkflows(object): + + def test_single_task_successful_execution(self, workflow_context, executor): + node, _, operation_name = self._create_interface(workflow_context, mock_success_task) + + workflow_context.service.workflows['custom_workflow'] = mock.models.create_operation( + 'custom_workflow', + operation_kwargs={'function': '{0}.{1}'.format(__name__, mock_workflow.__name__)} + ) + workflow_context.model.service.update(workflow_context.service) + + wf_runner = WorkflowRunner( + service_id=workflow_context.service.id, + inputs={}, + model_storage=workflow_context.model, + resource_storage=workflow_context.resource, + plugin_manager=None, + workflow_name='custom_workflow', + executor=executor) + wf_thread = Thread(target=wf_runner.execute) + try: + wf_thread.start() + + # Wait for the execution to start + while global_test_holder.get('state') != 'active': + pass + global_test_holder['state'] = 'terminated' + wf_runner.cancel() + + # Make sure the execution was canceled and the task has not ended + while wf_runner.execution.status != workflow_context.execution.CANCELLED: + pass + task = workflow_context.model.task.list(filters={'stub_type': None})[0] + assert task.status in (task.FAILED, task.RETRYING) + assert global_test_holder['state'] == 'idle' + + new_wf_runner = WorkflowRunner( + service_id=wf_runner.service.id, + inputs={}, + model_storage=workflow_context.model, + resource_storage=workflow_context.resource, + plugin_manager=None, + execution_id=wf_runner.execution.id, + executor=executor) + + global_test_holder['resumed'] = True + new_wf_runner.execute() + + while global_test_holder.get('state') != 'ended': + pass + assert task.status == task.SUCCESS + assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED + except BaseException: + global_test_holder['state'] = 'terminated' + wf_thread.join(5) + + @staticmethod + @pytest.fixture + def executor(): + result = thread.ThreadExecutor() + try: + yield result + finally: + result.close() + + @staticmethod + @pytest.fixture + def workflow_context(tmpdir): + workflow_context = mock.context.simple(str(tmpdir)) + yield workflow_context + storage.release_sqlite_storage(workflow_context.model) + + @staticmethod + def _create_interface(ctx, func, arguments=None): + node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + interface_name = 'aria.interfaces.lifecycle' + operation_kwargs = dict(function='{name}.{func.__name__}'.format( + name=__name__, func=func)) + if arguments: + # the operation has to declare the arguments before those may be passed + operation_kwargs['arguments'] = arguments + operation_name = 'create' + interface = mock.models.create_interface(node.service, interface_name, operation_name, + operation_kwargs=operation_kwargs) + node.interfaces[interface.name] = interface + ctx.model.node.update(node) + + return node, interface_name, operation_name + + @staticmethod + def _engine(workflow_func, workflow_context, executor): + graph = workflow_func(ctx=workflow_context) + execution = workflow_context.execution + engine.construct_execution_tasks(execution, graph, executor.__class__) + workflow_context.execution = execution + + return engine.Engine(default_executor=executor) + + +@workflow +def mock_workflow(ctx, graph): + node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + graph.add_tasks(_op(node, 'create')) + + +@operation +def mock_success_task(**_): + global_test_holder['state'] = 'active' + while global_test_holder.get('state') != 'terminated': + if global_test_holder.get('resumed') is True: + global_test_holder['state'] = 'ended' + return + global_test_holder['state'] = 'idle' + raise Exception("The operation was terminated") + + +def _op(node, + operation_name, + arguments=None, + max_attempts=None, + retry_interval=None, + ignore_failure=None): + + return api.task.OperationTask( + node, + interface_name='aria.interfaces.lifecycle', + operation_name=operation_name, + arguments=arguments, + max_attempts=max_attempts, + retry_interval=retry_interval, + ignore_failure=ignore_failure, + ) \ No newline at end of file
