Improve execution cancelling and implement force-cancelling
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/0da51978 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/0da51978 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/0da51978 Branch: refs/heads/ARIA-143-improve-cancelling-of-workflow-execution Commit: 0da519783567ec5ceee20297893ff235a13cb9e1 Parents: b446643 Author: Avia Efrat <[email protected]> Authored: Tue Apr 25 15:00:03 2017 +0300 Committer: Avia Efrat <[email protected]> Committed: Tue Apr 25 15:00:03 2017 +0300 ---------------------------------------------------------------------- aria/cli/commands/executions.py | 2 +- aria/modeling/orchestration.py | 13 +++--- aria/orchestrator/events.py | 1 + aria/orchestrator/workflow_runner.py | 3 ++ aria/orchestrator/workflows/core/engine.py | 11 ++++- .../workflows/core/events_handler.py | 48 ++++++++++++++++---- tests/modeling/test_models.py | 22 ++++----- tests/orchestrator/test_workflow_runner.py | 2 +- .../orchestrator/workflows/core/test_engine.py | 2 +- 9 files changed, 73 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0da51978/aria/cli/commands/executions.py ---------------------------------------------------------------------- diff --git a/aria/cli/commands/executions.py b/aria/cli/commands/executions.py index 6a1f02a..343ce6f 100644 --- a/aria/cli/commands/executions.py +++ b/aria/cli/commands/executions.py @@ -179,4 +179,4 @@ def _cancel_execution(workflow_runner, execution_thread, logger, log_iterator): execution_thread.join(1) except KeyboardInterrupt: logger.info('Force-cancelling execution') - # TODO handle execution (update status etc.) and exit process + workflow_runner.force_cancel() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0da51978/aria/modeling/orchestration.py ---------------------------------------------------------------------- diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py index b9a75e9..f49591d 100644 --- a/aria/modeling/orchestration.py +++ b/aria/modeling/orchestration.py @@ -56,21 +56,22 @@ class ExecutionBase(ModelMixin): __private_fields__ = ['service_fk', 'service_template'] - TERMINATED = 'terminated' + SUCCEEDED = 'succeeded' FAILED = 'failed' CANCELLED = 'cancelled' PENDING = 'pending' STARTED = 'started' CANCELLING = 'cancelling' - FORCE_CANCELLING = 'force_cancelling' + FORCE_CANCELLED = 'force_cancelled' - STATES = (TERMINATED, FAILED, CANCELLED, PENDING, STARTED, CANCELLING, FORCE_CANCELLING) - END_STATES = (TERMINATED, FAILED, CANCELLED) + STATES = (SUCCEEDED, FAILED, CANCELLED, PENDING, STARTED, CANCELLING, FORCE_CANCELLED) + END_STATES = (SUCCEEDED, FAILED, CANCELLED, FORCE_CANCELLED) + CANCEL_STATES = (CANCELLING, CANCELLED, FORCE_CANCELLED) VALID_TRANSITIONS = { - PENDING: (STARTED, CANCELLED), + PENDING: (STARTED, CANCELLED, FORCE_CANCELLED), STARTED: END_STATES + (CANCELLING,), - CANCELLING: END_STATES + (FORCE_CANCELLING,) + CANCELLING: END_STATES } @orm.validates('status') http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0da51978/aria/orchestrator/events.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/events.py b/aria/orchestrator/events.py index a1c4922..bcc0627 100644 --- a/aria/orchestrator/events.py +++ b/aria/orchestrator/events.py @@ -32,5 +32,6 @@ on_failure_task_signal = signal('failure_task_signal') start_workflow_signal = signal('start_workflow_signal') on_cancelling_workflow_signal = signal('on_cancelling_workflow_signal') on_cancelled_workflow_signal = signal('on_cancelled_workflow_signal') +on_force_cancelled_workflow_signal = signal('on_force_cancelled_workflow_signal') on_success_workflow_signal = signal('on_success_workflow_signal') on_failure_workflow_signal = signal('on_failure_workflow_signal') http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0da51978/aria/orchestrator/workflow_runner.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py index 8f25cce..32cf02d 100644 --- a/aria/orchestrator/workflow_runner.py +++ b/aria/orchestrator/workflow_runner.py @@ -107,6 +107,9 @@ class WorkflowRunner(object): def cancel(self): self._engine.cancel_execution() + def force_cancel(self): + self._engine.force_cancel_execution() + def _create_execution_model(self, inputs): execution = models.Execution( created_at=datetime.utcnow(), http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0da51978/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py index 155d0ee..18fabed 100644 --- a/aria/orchestrator/workflows/core/engine.py +++ b/aria/orchestrator/workflows/core/engine.py @@ -51,6 +51,11 @@ class Engine(logger.LoggerMixin): execute the workflow """ try: + import os + print os.getpid() + print "going to sleep" + time.sleep(10) + print "woke up from sleep" events.start_workflow_signal.send(self._workflow_context) while True: cancel = self._is_cancel() @@ -81,9 +86,11 @@ class Engine(logger.LoggerMixin): """ events.on_cancelling_workflow_signal.send(self._workflow_context) + def force_cancel_execution(self): + events.on_force_cancelled_workflow_signal.send(self._workflow_context) + def _is_cancel(self): - return self._workflow_context.execution.status in (models.Execution.CANCELLING, - models.Execution.CANCELLED) + return self._workflow_context.execution.status in models.Execution.CANCEL_STATES def _executable_tasks(self): now = datetime.utcnow() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0da51978/aria/orchestrator/workflows/core/events_handler.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py index 7f61bfa..f4fd6b2 100644 --- a/aria/orchestrator/workflows/core/events_handler.py +++ b/aria/orchestrator/workflows/core/events_handler.py @@ -28,6 +28,9 @@ from datetime import ( from ... import events from ... import exceptions +EXECUTION_ALREADY_ENDED_TEMPLATE = "'{workflow_name}' workflow execution {status} " \ + "before the cancel request was received'" + @events.sent_task_signal.connect def _task_sent(task, *args, **kwargs): @@ -81,6 +84,9 @@ def _task_succeeded(task, *args, **kwargs): @events.start_workflow_signal.connect def _workflow_started(workflow_context, *args, **kwargs): execution = workflow_context.execution + # the execution may already be in a cancelling process + if execution.status in execution.CANCEL_STATES: + return execution.status = execution.STARTED execution.started_at = datetime.utcnow() workflow_context.execution = execution @@ -98,7 +104,7 @@ def _workflow_failed(workflow_context, exception, *args, **kwargs): @events.on_success_workflow_signal.connect def _workflow_succeeded(workflow_context, *args, **kwargs): execution = workflow_context.execution - execution.status = execution.TERMINATED + execution.status = execution.SUCCEEDED execution.ended_at = datetime.utcnow() workflow_context.execution = execution @@ -106,22 +112,40 @@ def _workflow_succeeded(workflow_context, *args, **kwargs): @events.on_cancelled_workflow_signal.connect def _workflow_cancelled(workflow_context, *args, **kwargs): execution = workflow_context.execution - # _workflow_cancelling function may have called this function - # already - if execution.status == execution.CANCELLED: + status = execution.status + # _workflow_cancelling function may have called this function already + if status == execution.CANCELLED: return - execution.status = execution.CANCELLED - execution.ended_at = datetime.utcnow() + # the execution may have already been finished + elif status == execution.SUCCEEDED or status == execution.FAILED: + _log_execution_already_ended(workflow_context, status) + else: + execution.status = execution.CANCELLED + execution.ended_at = datetime.utcnow() + workflow_context.execution = execution + + [email protected]_force_cancelled_workflow_signal.connect +def _workflow_force_cancelled(workflow_context, *args, **kwargs): + execution = workflow_context.execution + if execution.status in execution.END_STATES: + return + execution.status = execution.FORCE_CANCELLED workflow_context.execution = execution @events.on_cancelling_workflow_signal.connect def _workflow_cancelling(workflow_context, *args, **kwargs): execution = workflow_context.execution - if execution.status == execution.PENDING: + status = execution.status + if status == execution.PENDING: return _workflow_cancelled(workflow_context=workflow_context) - execution.status = execution.CANCELLING - workflow_context.execution = execution + # the execution may have already been finished + elif status == execution.SUCCEEDED or status == execution.FAILED: + _log_execution_already_ended(workflow_context, status) + else: + execution.status = execution.CANCELLING + workflow_context.execution = execution def _update_node_state_if_necessary(task, is_transitional=False): @@ -135,3 +159,9 @@ def _update_node_state_if_necessary(task, is_transitional=False): if state: node.state = state task.context.model.node.update(node) + + +def _log_execution_already_ended(workflow_context, status): + workflow_context.logger.info( + "'{workflow_name}' workflow execution {status} before the cancel request" + "was received'".format(workflow_name=workflow_context.workflow_name, status=status)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0da51978/tests/modeling/test_models.py ---------------------------------------------------------------------- diff --git a/tests/modeling/test_models.py b/tests/modeling/test_models.py index d64cdba..2da2154 100644 --- a/tests/modeling/test_models.py +++ b/tests/modeling/test_models.py @@ -310,40 +310,40 @@ class TestExecution(object): Execution.CANCELLED, Execution.PENDING], Execution.STARTED: [Execution.FAILED, - Execution.TERMINATED, + Execution.SUCCEEDED, Execution.CANCELLED, Execution.CANCELLING, Execution.STARTED], Execution.CANCELLING: [Execution.FAILED, - Execution.TERMINATED, + Execution.SUCCEEDED, Execution.CANCELLED, Execution.CANCELLING], Execution.FAILED: [Execution.FAILED], - Execution.TERMINATED: [Execution.TERMINATED], + Execution.SUCCEEDED: [Execution.SUCCEEDED], Execution.CANCELLED: [Execution.CANCELLED] } invalid_transitions = { Execution.PENDING: [Execution.FAILED, - Execution.TERMINATED, + Execution.SUCCEEDED, Execution.CANCELLING], Execution.STARTED: [Execution.PENDING], Execution.CANCELLING: [Execution.PENDING, Execution.STARTED], Execution.FAILED: [Execution.PENDING, Execution.STARTED, - Execution.TERMINATED, + Execution.SUCCEEDED, Execution.CANCELLED, Execution.CANCELLING], - Execution.TERMINATED: [Execution.PENDING, - Execution.STARTED, - Execution.FAILED, - Execution.CANCELLED, - Execution.CANCELLING], + Execution.SUCCEEDED: [Execution.PENDING, + Execution.STARTED, + Execution.FAILED, + Execution.CANCELLED, + Execution.CANCELLING], Execution.CANCELLED: [Execution.PENDING, Execution.STARTED, Execution.FAILED, - Execution.TERMINATED, + Execution.SUCCEEDED, Execution.CANCELLING], } http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0da51978/tests/orchestrator/test_workflow_runner.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py index 54e940f..7374e50 100644 --- a/tests/orchestrator/test_workflow_runner.py +++ b/tests/orchestrator/test_workflow_runner.py @@ -86,7 +86,7 @@ def test_existing_active_executions(request, service, model): def test_existing_executions_but_no_active_ones(request, service, model): existing_terminated_execution = models.Execution( service=service, - status=models.Execution.TERMINATED, + status=models.Execution.SUCCEEDED, workflow_name='uninstall') model.execution.put(existing_terminated_execution) # no active executions exist, so no error should be raised http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0da51978/tests/orchestrator/workflows/core/test_engine.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py index 1a88f13..af9af17 100644 --- a/tests/orchestrator/workflows/core/test_engine.py +++ b/tests/orchestrator/workflows/core/test_engine.py @@ -157,7 +157,7 @@ class TestEngine(BaseTest): execution = workflow_context.execution assert execution.started_at <= execution.ended_at <= datetime.utcnow() assert execution.error is None - assert execution.status == models.Execution.TERMINATED + assert execution.status == models.Execution.SUCCEEDED def test_single_task_successful_execution(self, workflow_context, executor): @workflow
