Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-11-wf-cancel [created] a4b7894a5
ARIA-11 Add cancel workflow execution support Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/a4b7894a Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/a4b7894a Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/a4b7894a Branch: refs/heads/ARIA-11-wf-cancel Commit: a4b7894a5dc84d3b35db9d5bbf7787533eaa2f54 Parents: 9a44178 Author: Dan Kilman <[email protected]> Authored: Wed Nov 9 16:37:43 2016 +0200 Committer: Dan Kilman <[email protected]> Committed: Wed Nov 9 18:48:59 2016 +0200 ---------------------------------------------------------------------- aria/context/workflow.py | 14 +++++ aria/events/__init__.py | 2 + aria/events/builtin_event_handler.py | 34 +++++++---- aria/events/workflow_engine_event_handler.py | 14 ++++- aria/storage/models.py | 4 +- aria/workflows/core/engine.py | 21 ++++++- tests/context/__init__.py | 14 +++++ tests/context/test_workflow.py | 59 +++++++++++++++++++ tests/mock/context.py | 1 + tests/workflows/core/test_engine.py | 71 +++++++++++++++++++++-- 10 files changed, 213 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a4b7894a/aria/context/workflow.py ---------------------------------------------------------------------- diff --git a/aria/context/workflow.py b/aria/context/workflow.py index 44db38d..9c5abdc 100644 --- a/aria/context/workflow.py +++ b/aria/context/workflow.py @@ -62,6 +62,20 @@ class WorkflowContext(logger.LoggerMixin): self.task_max_retries = task_max_retries self.task_retry_interval = task_retry_interval + try: + self.model.execution.get(self.execution_id) + except exceptions.StorageError: + execution_cls = self.model.execution.model_cls + execution = self.model.execution.model_cls( + id=self.execution_id, + deployment_id=self.deployment_id, + workflow_id=self.workflow_id, + blueprint_id=self.blueprint_id, + status=execution_cls.PENDING, + parameters=self.parameters, + ) + self.model.execution.store(execution) + def __repr__(self): return ( '{name}(deployment_id={self.deployment_id}, ' http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a4b7894a/aria/events/__init__.py ---------------------------------------------------------------------- diff --git a/aria/events/__init__.py b/aria/events/__init__.py index 74f3e22..a158308 100644 --- a/aria/events/__init__.py +++ b/aria/events/__init__.py @@ -46,6 +46,8 @@ on_failure_task_signal = signal('failure_task_signal') # workflow engine workflow signals: start_workflow_signal = signal('start_workflow_signal') +on_cancelling_workflow_signal = signal('on_cancelling_workflow_signal') +on_cancel_workflow_signal = signal('on_cancel_workflow_signal') on_success_workflow_signal = signal('on_success_workflow_signal') on_failure_workflow_signal = signal('on_failure_workflow_signal') http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a4b7894a/aria/events/builtin_event_handler.py ---------------------------------------------------------------------- diff --git a/aria/events/builtin_event_handler.py b/aria/events/builtin_event_handler.py index b54024b..11ee744 100644 --- a/aria/events/builtin_event_handler.py +++ b/aria/events/builtin_event_handler.py @@ -30,6 +30,8 @@ from . import ( start_workflow_signal, on_success_workflow_signal, on_failure_workflow_signal, + on_cancel_workflow_signal, + on_cancelling_workflow_signal, sent_task_signal, start_task_signal, on_success_task_signal, @@ -71,16 +73,9 @@ def _task_succeeded(task, *args, **kwargs): @start_workflow_signal.connect def _workflow_started(workflow_context, *args, **kwargs): - execution_cls = workflow_context.model.execution.model_cls - execution = execution_cls( - id=workflow_context.execution_id, - deployment_id=workflow_context.deployment_id, - workflow_id=workflow_context.workflow_id, - blueprint_id=workflow_context.blueprint_id, - status=execution_cls.PENDING, - started_at=datetime.utcnow(), - parameters=workflow_context.parameters, - ) + execution = workflow_context.execution + execution.status = execution.STARTED + execution.started_at = datetime.utcnow() workflow_context.execution = execution @@ -99,3 +94,22 @@ def _workflow_succeeded(workflow_context, *args, **kwargs): execution.status = execution.TERMINATED execution.ended_at = datetime.utcnow() workflow_context.execution = execution + + +@on_cancel_workflow_signal.connect +def _workflow_cancelled(workflow_context, *args, **kwargs): + execution = workflow_context.execution + execution.status = execution.CANCELLED + execution.ended_at = datetime.utcnow() + workflow_context.execution = execution + + +@on_cancelling_workflow_signal.connect +def _workflow_cancelling(workflow_context, *args, **kwargs): + execution = workflow_context.execution + # TODO: handle concurrency, locks may not be enough as this may + # be modified somewhere else entirely + if execution.status == execution.PENDING: + return _workflow_cancelled(workflow_context=workflow_context) + execution.status = execution.CANCELLING + workflow_context.execution = execution http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a4b7894a/aria/events/workflow_engine_event_handler.py ---------------------------------------------------------------------- diff --git a/aria/events/workflow_engine_event_handler.py b/aria/events/workflow_engine_event_handler.py index 2f74ded..4067674 100644 --- a/aria/events/workflow_engine_event_handler.py +++ b/aria/events/workflow_engine_event_handler.py @@ -27,7 +27,9 @@ from . import ( on_failure_task_signal, start_workflow_signal, on_success_workflow_signal, - on_failure_workflow_signal + on_failure_workflow_signal, + on_cancel_workflow_signal, + on_cancelling_workflow_signal, ) @@ -60,3 +62,13 @@ def _failure_workflow_handler(context, **kwargs): @on_success_workflow_signal.connect def _success_workflow_handler(context, **kwargs): context.logger.debug('Event: Workflow success: {context.name}'.format(context=context)) + + +@on_cancel_workflow_signal.connect +def _cancel_workflow_handler(context, **kwargs): + context.logger.debug('Event: Workflow cancelled: {context.name}'.format(context=context)) + + +@on_cancelling_workflow_signal.connect +def _cancelling_workflow_handler(context, **kwargs): + context.logger.debug('Event: Workflow cancelling: {context.name}'.format(context=context)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a4b7894a/aria/storage/models.py ---------------------------------------------------------------------- diff --git a/aria/storage/models.py b/aria/storage/models.py index a3686de..eb943aa 100644 --- a/aria/storage/models.py +++ b/aria/storage/models.py @@ -191,7 +191,6 @@ class Execution(Model): PENDING = 'pending' STARTED = 'started' CANCELLING = 'cancelling' - FORCE_CANCELLING = 'force_cancelling' STATES = ( TERMINATED, FAILED, @@ -199,7 +198,6 @@ class Execution(Model): PENDING, STARTED, CANCELLING, - FORCE_CANCELLING, ) END_STATES = [TERMINATED, FAILED, CANCELLED] ACTIVE_STATES = [state for state in STATES if state not in END_STATES] @@ -209,7 +207,7 @@ class Execution(Model): deployment_id = Field(type=basestring) workflow_id = Field(type=basestring) blueprint_id = Field(type=basestring) - started_at = Field(type=datetime) + started_at = Field(type=datetime, default=None) ended_at = Field(type=datetime, default=None) error = Field(type=basestring, default=None) parameters = Field() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a4b7894a/aria/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/workflows/core/engine.py b/aria/workflows/core/engine.py index 3ed137c..c4a049c 100644 --- a/aria/workflows/core/engine.py +++ b/aria/workflows/core/engine.py @@ -48,7 +48,7 @@ class Engine(logger.LoggerMixin): """ try: events.start_workflow_signal.send(self._workflow_context) - while True: + while not self._is_cancelling(): for task in self._ended_tasks(): self._handle_ended_tasks(task) for task in self._executable_tasks(): @@ -57,11 +57,28 @@ class Engine(logger.LoggerMixin): break else: time.sleep(0.1) - events.on_success_workflow_signal.send(self._workflow_context) + if self._is_cancelling(): + events.on_cancel_workflow_signal.send(self._workflow_context) + else: + events.on_success_workflow_signal.send(self._workflow_context) except BaseException as e: events.on_failure_workflow_signal.send(self._workflow_context, exception=e) raise + def cancel_request(self): + """ + Send a cancel request to the engine. If execution already started, execution status + will be modified to 'cancelling' status. If execution is in pending mode, execution status + will be modified to 'cancelled' directly. If execution is in one if its ended state, an + AriaEngineError will be raised + """ + if self._workflow_context.execution.status not in models.Execution.ACTIVE_STATES: + raise exceptions.AriaEngineError('Execution already ended') + events.on_cancelling_workflow_signal.send(self._workflow_context) + + def _is_cancelling(self): + return self._workflow_context.execution.status == models.Execution.CANCELLING + def _executable_tasks(self): now = datetime.utcnow() return (task for task in self._tasks_iter() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a4b7894a/tests/context/__init__.py ---------------------------------------------------------------------- diff --git a/tests/context/__init__.py b/tests/context/__init__.py new file mode 100644 index 0000000..ae1e83e --- /dev/null +++ b/tests/context/__init__.py @@ -0,0 +1,14 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a4b7894a/tests/context/test_workflow.py ---------------------------------------------------------------------- diff --git a/tests/context/test_workflow.py b/tests/context/test_workflow.py new file mode 100644 index 0000000..2e19aa2 --- /dev/null +++ b/tests/context/test_workflow.py @@ -0,0 +1,59 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from aria import context, application_model_storage + +from ..mock import models +from ..storage import InMemoryModelDriver + + +class TestWorkflowContext(object): + + def test_execution_creation_on_workflow_context_creation(self, storage): + self._create_ctx(storage) + execution = storage.execution.get(models.EXECUTION_ID) + assert execution.id == models.EXECUTION_ID + assert execution.deployment_id == models.DEPLOYMENT_ID + assert execution.workflow_id == models.WORKFLOW_ID + assert execution.blueprint_id == models.BLUEPRINT_ID + assert execution.status == storage.execution.model_cls.PENDING + assert execution.parameters == {} + + def test_subsequent_workflow_context_creation_do_not_fail(self, storage): + self._create_ctx(storage) + self._create_ctx(storage) + + @staticmethod + def _create_ctx(storage): + return context.workflow.WorkflowContext( + name='simple_context', + model_storage=storage, + resource_storage=None, + deployment_id=models.DEPLOYMENT_ID, + workflow_id=models.WORKFLOW_ID, + execution_id=models.EXECUTION_ID, + task_max_retries=models.TASK_MAX_RETRIES, + task_retry_interval=models.TASK_RETRY_INTERVAL + ) + + [email protected](scope='function') +def storage(): + result = application_model_storage(InMemoryModelDriver()) + result.setup() + result.deployment.store(models.get_deployment()) + return result http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a4b7894a/tests/mock/context.py ---------------------------------------------------------------------- diff --git a/tests/mock/context.py b/tests/mock/context.py index 13020f3..bef2437 100644 --- a/tests/mock/context.py +++ b/tests/mock/context.py @@ -22,6 +22,7 @@ from ..storage import InMemoryModelDriver def simple(): storage = application_model_storage(InMemoryModelDriver()) storage.setup() + storage.deployment.store(models.get_deployment()) return context.workflow.WorkflowContext( name='simple_context', model_storage=storage, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a4b7894a/tests/workflows/core/test_engine.py ---------------------------------------------------------------------- diff --git a/tests/workflows/core/test_engine.py b/tests/workflows/core/test_engine.py index d0d41f3..748281e 100644 --- a/tests/workflows/core/test_engine.py +++ b/tests/workflows/core/test_engine.py @@ -14,6 +14,7 @@ # limitations under the License. import time +import threading from datetime import datetime import pytest @@ -37,11 +38,20 @@ global_test_holder = {} class BaseTest(object): + @classmethod + def _execute(cls, workflow_func, workflow_context, executor): + eng = cls._engine(workflow_func=workflow_func, + workflow_context=workflow_context, + executor=executor) + eng.execute() + return eng + @staticmethod - def _execute(workflow_func, workflow_context, executor): + def _engine(workflow_func, workflow_context, executor): graph = workflow_func(ctx=workflow_context) - eng = engine.Engine(executor=executor, workflow_context=workflow_context, tasks_graph=graph) - eng.execute() + return engine.Engine(executor=executor, + workflow_context=workflow_context, + tasks_graph=graph) @staticmethod def _op(func, ctx, inputs=None, max_retries=None, retry_interval=None): @@ -78,9 +88,13 @@ class BaseTest(object): workflow_context.states.append('failure') workflow_context.exception = exception + def cancel_workflow_handler(workflow_context, *args, **kwargs): + workflow_context.states.append('cancel') + events.start_workflow_signal.connect(start_workflow_handler) events.on_success_workflow_signal.connect(success_workflow_handler) events.on_failure_workflow_signal.connect(failure_workflow_handler) + events.on_cancel_workflow_signal.connect(cancel_workflow_handler) events.sent_task_signal.connect(sent_task_handler) try: yield @@ -88,6 +102,7 @@ class BaseTest(object): events.start_workflow_signal.disconnect(start_workflow_handler) events.on_success_workflow_signal.disconnect(success_workflow_handler) events.on_failure_workflow_signal.disconnect(failure_workflow_handler) + events.on_cancel_workflow_signal.disconnect(cancel_workflow_handler) events.sent_task_signal.disconnect(sent_task_handler) @pytest.fixture(scope='function') @@ -189,16 +204,56 @@ class TestEngine(BaseTest): @workflow def mock_workflow(ctx, graph): graph.add_tasks(api.task.WorkflowTask(sub_workflow, ctx=ctx)) - self._execute(workflow_func=mock_workflow, workflow_context=workflow_context, executor=executor) - assert workflow_context.states == ['start', 'success'] assert workflow_context.exception is None assert global_test_holder.get('invocations') == [1, 2] assert global_test_holder.get('sent_task_signal_calls') == 2 + def test_cancel_started_execution(self, workflow_context, executor): + number_of_tasks = 100 + + @workflow + def mock_workflow(ctx, graph): + return graph.sequence(*(self._op(mock_sleep_task, ctx, inputs={'seconds': 0.1}) + for _ in range(number_of_tasks))) + eng = self._engine(workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + t = threading.Thread(target=eng.execute) + t.start() + time.sleep(1) + eng.cancel_request() + t.join(timeout=30) + assert workflow_context.states == ['start', 'cancel'] + assert workflow_context.exception is None + invocations = global_test_holder.get('invocations', []) + assert 0 < len(invocations) < number_of_tasks + + def test_invalid_cancel_ended_execution(self, workflow_context, executor): + @workflow + def mock_workflow(**_): + pass + eng = self._execute(workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + with pytest.raises(exceptions.AriaEngineError): + eng.cancel_request() + + def test_cancel_pending_execution(self, workflow_context, executor): + @workflow + def mock_workflow(graph, **_): + return graph + eng = self._engine(workflow_func=mock_workflow, + workflow_context=workflow_context, + executor=executor) + eng.cancel_request() + # sanity to verify previous cancel request actually did something + with pytest.raises(exceptions.AriaEngineError): + eng.cancel_request() + class TestRetries(BaseTest): @@ -334,3 +389,9 @@ def mock_conditional_failure_task(failure_count): raise RuntimeError finally: invocations.append(time.time()) + + +def mock_sleep_task(seconds): + invocations = global_test_holder.setdefault('invocations', []) + invocations.append(time.time()) + time.sleep(seconds)
