Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-143-improve-cancelling-of-workflow-execution 884c3915e -> 4e81d7fff (forced update)
Improve execution cancelling Unhandled execution status transitions resulting from cancelling an execution via the CLI, that we indentified and tried to address: 1. TERMINATED -> CANCELLING You cancel the execution, but by the time we try to set the status to CANCELLING, the execution thread had already finished, and therefore, in SUCCEEDED status. 2. FAILED -> CANCELLING You cancel the execution, but by the time we try to set the status to CANCELLING, the execution thread had already encountered an error, and therefore, in FAILED state. 3. TERMINATED -> CANCELLED Similar to #1, but with CANCELLED instead of CANCELLING. 4. FAILED -> CANCELLED Similar to #1, but with CANCELLED instead of CANCELLING. In all of the above cases (#1-#4), we skip updating the execution status, and log that the execution already succeeded/failed before we were able to cancel it. 5. CANCELLING -> STARTED You cancel the execution while it is still in pending state. Meanwhile, while the execution status was already set to CANCELLING, we try to set the execution status 6. CANCELLED -> STARTED Similar to #5, but after the status is set to CANCELLING, it also gets set to CANCELLED before attempting to set it to STARTED. In cases #5-#6, we skip updtating the execution status, and nothing is logged. Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/4e81d7ff Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/4e81d7ff Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/4e81d7ff Branch: refs/heads/ARIA-143-improve-cancelling-of-workflow-execution Commit: 4e81d7fff45bae1a2ccb9f0b897179c95076cf3f Parents: 7a4a1dd Author: Avia Efrat <[email protected]> Authored: Thu Apr 20 13:23:32 2017 +0300 Committer: Avia Efrat <[email protected]> Committed: Tue Apr 25 18:21:11 2017 +0300 ---------------------------------------------------------------------- aria/cli/commands/executions.py | 13 +++---- aria/modeling/orchestration.py | 9 +++-- aria/orchestrator/workflow_runner.py | 1 + .../workflows/core/events_handler.py | 38 ++++++++++++++------ aria/orchestrator/workflows/events_logging.py | 1 + tests/modeling/test_models.py | 22 ++++++------ tests/orchestrator/test_workflow_runner.py | 2 +- .../orchestrator/workflows/core/test_engine.py | 2 +- 8 files changed, 51 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e81d7ff/aria/cli/commands/executions.py ---------------------------------------------------------------------- diff --git a/aria/cli/commands/executions.py b/aria/cli/commands/executions.py index 6a1f02a..8b61c02 100644 --- a/aria/cli/commands/executions.py +++ b/aria/cli/commands/executions.py @@ -139,7 +139,6 @@ def start(workflow_name, execution_thread_name = '{0}_{1}'.format(service_name, workflow_name) execution_thread = threading.ExceptionThread(target=workflow_runner.execute, name=execution_thread_name) - execution_thread.daemon = True # allows force-cancel to exit immediately logger.info('Starting {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else '')) execution_thread.start() @@ -172,11 +171,7 @@ def start(workflow_name, def _cancel_execution(workflow_runner, execution_thread, logger, log_iterator): logger.info('Cancelling execution. Press Ctrl+C again to force-cancel') - try: - workflow_runner.cancel() - while execution_thread.is_alive(): - execution_logging.log_list(log_iterator) - execution_thread.join(1) - except KeyboardInterrupt: - logger.info('Force-cancelling execution') - # TODO handle execution (update status etc.) and exit process + workflow_runner.cancel() + while execution_thread.is_alive(): + execution_logging.log_list(log_iterator) + execution_thread.join(1) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e81d7ff/aria/modeling/orchestration.py ---------------------------------------------------------------------- diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py index b9a75e9..f163903 100644 --- a/aria/modeling/orchestration.py +++ b/aria/modeling/orchestration.py @@ -56,21 +56,20 @@ class ExecutionBase(ModelMixin): __private_fields__ = ['service_fk', 'service_template'] - TERMINATED = 'terminated' + SUCCEEDED = 'succeeded' FAILED = 'failed' CANCELLED = 'cancelled' PENDING = 'pending' STARTED = 'started' CANCELLING = 'cancelling' - FORCE_CANCELLING = 'force_cancelling' - STATES = (TERMINATED, FAILED, CANCELLED, PENDING, STARTED, CANCELLING, FORCE_CANCELLING) - END_STATES = (TERMINATED, FAILED, CANCELLED) + STATES = (SUCCEEDED, FAILED, CANCELLED, PENDING, STARTED, CANCELLING) + END_STATES = (SUCCEEDED, FAILED, CANCELLED) VALID_TRANSITIONS = { PENDING: (STARTED, CANCELLED), STARTED: END_STATES + (CANCELLING,), - CANCELLING: END_STATES + (FORCE_CANCELLING,) + CANCELLING: END_STATES } @orm.validates('status') http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e81d7ff/aria/orchestrator/workflow_runner.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py index 8f25cce..05471f9 100644 --- a/aria/orchestrator/workflow_runner.py +++ b/aria/orchestrator/workflow_runner.py @@ -107,6 +107,7 @@ class WorkflowRunner(object): def cancel(self): self._engine.cancel_execution() + def _create_execution_model(self, inputs): execution = models.Execution( created_at=datetime.utcnow(), http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e81d7ff/aria/orchestrator/workflows/core/events_handler.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py index 7f61bfa..b5912da 100644 --- a/aria/orchestrator/workflows/core/events_handler.py +++ b/aria/orchestrator/workflows/core/events_handler.py @@ -81,6 +81,9 @@ def _task_succeeded(task, *args, **kwargs): @events.start_workflow_signal.connect def _workflow_started(workflow_context, *args, **kwargs): execution = workflow_context.execution + # the execution may already be in a cancelling process + if execution.status == execution.CANCELLING or execution == execution.CANCELLED: + return execution.status = execution.STARTED execution.started_at = datetime.utcnow() workflow_context.execution = execution @@ -98,7 +101,7 @@ def _workflow_failed(workflow_context, exception, *args, **kwargs): @events.on_success_workflow_signal.connect def _workflow_succeeded(workflow_context, *args, **kwargs): execution = workflow_context.execution - execution.status = execution.TERMINATED + execution.status = execution.SUCCEEDED execution.ended_at = datetime.utcnow() workflow_context.execution = execution @@ -106,22 +109,31 @@ def _workflow_succeeded(workflow_context, *args, **kwargs): @events.on_cancelled_workflow_signal.connect def _workflow_cancelled(workflow_context, *args, **kwargs): execution = workflow_context.execution - # _workflow_cancelling function may have called this function - # already - if execution.status == execution.CANCELLED: + status = execution.status + # _workflow_cancelling function may have called this function already + if status == execution.CANCELLED: return - execution.status = execution.CANCELLED - execution.ended_at = datetime.utcnow() - workflow_context.execution = execution + # the execution may have already been finished + elif status == execution.SUCCEEDED or status == execution.FAILED: + _log_execution_already_ended(workflow_context, status) + else: + execution.status = execution.CANCELLED + execution.ended_at = datetime.utcnow() + workflow_context.execution = execution @events.on_cancelling_workflow_signal.connect def _workflow_cancelling(workflow_context, *args, **kwargs): execution = workflow_context.execution - if execution.status == execution.PENDING: + status = execution.status + if status == execution.PENDING: return _workflow_cancelled(workflow_context=workflow_context) - execution.status = execution.CANCELLING - workflow_context.execution = execution + # the execution may have already been finished + elif status == execution.SUCCEEDED or status == execution.FAILED: + _log_execution_already_ended(workflow_context, status) + else: + execution.status = execution.CANCELLING + workflow_context.execution = execution def _update_node_state_if_necessary(task, is_transitional=False): @@ -135,3 +147,9 @@ def _update_node_state_if_necessary(task, is_transitional=False): if state: node.state = state task.context.model.node.update(node) + + +def _log_execution_already_ended(workflow_context, status): + workflow_context.logger.info( + "'{workflow_name}' workflow execution {status} before the cancel request" + "was fully processed '".format(workflow_name=workflow_context.workflow_name, status=status)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e81d7ff/aria/orchestrator/workflows/events_logging.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/events_logging.py b/aria/orchestrator/workflows/events_logging.py index 7d15c81..3ffe18b 100644 --- a/aria/orchestrator/workflows/events_logging.py +++ b/aria/orchestrator/workflows/events_logging.py @@ -52,6 +52,7 @@ def _failure_operation_handler(task, traceback, **kwargs): .format(name=_get_task_name(task), task=task), extra=dict(traceback=traceback) ) + @events.start_workflow_signal.connect def _start_workflow_handler(context, **kwargs): context.logger.info("Starting '{ctx.workflow_name}' workflow execution".format(ctx=context)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e81d7ff/tests/modeling/test_models.py ---------------------------------------------------------------------- diff --git a/tests/modeling/test_models.py b/tests/modeling/test_models.py index d64cdba..2da2154 100644 --- a/tests/modeling/test_models.py +++ b/tests/modeling/test_models.py @@ -310,40 +310,40 @@ class TestExecution(object): Execution.CANCELLED, Execution.PENDING], Execution.STARTED: [Execution.FAILED, - Execution.TERMINATED, + Execution.SUCCEEDED, Execution.CANCELLED, Execution.CANCELLING, Execution.STARTED], Execution.CANCELLING: [Execution.FAILED, - Execution.TERMINATED, + Execution.SUCCEEDED, Execution.CANCELLED, Execution.CANCELLING], Execution.FAILED: [Execution.FAILED], - Execution.TERMINATED: [Execution.TERMINATED], + Execution.SUCCEEDED: [Execution.SUCCEEDED], Execution.CANCELLED: [Execution.CANCELLED] } invalid_transitions = { Execution.PENDING: [Execution.FAILED, - Execution.TERMINATED, + Execution.SUCCEEDED, Execution.CANCELLING], Execution.STARTED: [Execution.PENDING], Execution.CANCELLING: [Execution.PENDING, Execution.STARTED], Execution.FAILED: [Execution.PENDING, Execution.STARTED, - Execution.TERMINATED, + Execution.SUCCEEDED, Execution.CANCELLED, Execution.CANCELLING], - Execution.TERMINATED: [Execution.PENDING, - Execution.STARTED, - Execution.FAILED, - Execution.CANCELLED, - Execution.CANCELLING], + Execution.SUCCEEDED: [Execution.PENDING, + Execution.STARTED, + Execution.FAILED, + Execution.CANCELLED, + Execution.CANCELLING], Execution.CANCELLED: [Execution.PENDING, Execution.STARTED, Execution.FAILED, - Execution.TERMINATED, + Execution.SUCCEEDED, Execution.CANCELLING], } http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e81d7ff/tests/orchestrator/test_workflow_runner.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py index 54e940f..7374e50 100644 --- a/tests/orchestrator/test_workflow_runner.py +++ b/tests/orchestrator/test_workflow_runner.py @@ -86,7 +86,7 @@ def test_existing_active_executions(request, service, model): def test_existing_executions_but_no_active_ones(request, service, model): existing_terminated_execution = models.Execution( service=service, - status=models.Execution.TERMINATED, + status=models.Execution.SUCCEEDED, workflow_name='uninstall') model.execution.put(existing_terminated_execution) # no active executions exist, so no error should be raised http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4e81d7ff/tests/orchestrator/workflows/core/test_engine.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py index 1a88f13..af9af17 100644 --- a/tests/orchestrator/workflows/core/test_engine.py +++ b/tests/orchestrator/workflows/core/test_engine.py @@ -157,7 +157,7 @@ class TestEngine(BaseTest): execution = workflow_context.execution assert execution.started_at <= execution.ended_at <= datetime.utcnow() assert execution.error is None - assert execution.status == models.Execution.TERMINATED + assert execution.status == models.Execution.SUCCEEDED def test_single_task_successful_execution(self, workflow_context, executor): @workflow
