Improve execution cancelling and implement force-cancelling

Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/0da51978
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/0da51978
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/0da51978

Branch: refs/heads/ARIA-143-improve-cancelling-of-workflow-execution
Commit: 0da519783567ec5ceee20297893ff235a13cb9e1
Parents: b446643
Author: Avia Efrat <[email protected]>
Authored: Tue Apr 25 15:00:03 2017 +0300
Committer: Avia Efrat <[email protected]>
Committed: Tue Apr 25 15:00:03 2017 +0300

----------------------------------------------------------------------
 aria/cli/commands/executions.py                 |  2 +-
 aria/modeling/orchestration.py                  | 13 +++---
 aria/orchestrator/events.py                     |  1 +
 aria/orchestrator/workflow_runner.py            |  3 ++
 aria/orchestrator/workflows/core/engine.py      | 11 ++++-
 .../workflows/core/events_handler.py            | 48 ++++++++++++++++----
 tests/modeling/test_models.py                   | 22 ++++-----
 tests/orchestrator/test_workflow_runner.py      |  2 +-
 .../orchestrator/workflows/core/test_engine.py  |  2 +-
 9 files changed, 73 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0da51978/aria/cli/commands/executions.py
----------------------------------------------------------------------
diff --git a/aria/cli/commands/executions.py b/aria/cli/commands/executions.py
index 6a1f02a..343ce6f 100644
--- a/aria/cli/commands/executions.py
+++ b/aria/cli/commands/executions.py
@@ -179,4 +179,4 @@ def _cancel_execution(workflow_runner, execution_thread, 
logger, log_iterator):
             execution_thread.join(1)
     except KeyboardInterrupt:
         logger.info('Force-cancelling execution')
-        # TODO handle execution (update status etc.) and exit process
+        workflow_runner.force_cancel()

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0da51978/aria/modeling/orchestration.py
----------------------------------------------------------------------
diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py
index b9a75e9..f49591d 100644
--- a/aria/modeling/orchestration.py
+++ b/aria/modeling/orchestration.py
@@ -56,21 +56,22 @@ class ExecutionBase(ModelMixin):
     __private_fields__ = ['service_fk',
                           'service_template']
 
-    TERMINATED = 'terminated'
+    SUCCEEDED = 'succeeded'
     FAILED = 'failed'
     CANCELLED = 'cancelled'
     PENDING = 'pending'
     STARTED = 'started'
     CANCELLING = 'cancelling'
-    FORCE_CANCELLING = 'force_cancelling'
+    FORCE_CANCELLED = 'force_cancelled'
 
-    STATES = (TERMINATED, FAILED, CANCELLED, PENDING, STARTED, CANCELLING, 
FORCE_CANCELLING)
-    END_STATES = (TERMINATED, FAILED, CANCELLED)
+    STATES = (SUCCEEDED, FAILED, CANCELLED, PENDING, STARTED, CANCELLING, 
FORCE_CANCELLED)
+    END_STATES = (SUCCEEDED, FAILED, CANCELLED, FORCE_CANCELLED)
+    CANCEL_STATES = (CANCELLING, CANCELLED, FORCE_CANCELLED)
 
     VALID_TRANSITIONS = {
-        PENDING: (STARTED, CANCELLED),
+        PENDING: (STARTED, CANCELLED, FORCE_CANCELLED),
         STARTED: END_STATES + (CANCELLING,),
-        CANCELLING: END_STATES + (FORCE_CANCELLING,)
+        CANCELLING: END_STATES
     }
 
     @orm.validates('status')

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0da51978/aria/orchestrator/events.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/events.py b/aria/orchestrator/events.py
index a1c4922..bcc0627 100644
--- a/aria/orchestrator/events.py
+++ b/aria/orchestrator/events.py
@@ -32,5 +32,6 @@ on_failure_task_signal = signal('failure_task_signal')
 start_workflow_signal = signal('start_workflow_signal')
 on_cancelling_workflow_signal = signal('on_cancelling_workflow_signal')
 on_cancelled_workflow_signal = signal('on_cancelled_workflow_signal')
+on_force_cancelled_workflow_signal = 
signal('on_force_cancelled_workflow_signal')
 on_success_workflow_signal = signal('on_success_workflow_signal')
 on_failure_workflow_signal = signal('on_failure_workflow_signal')

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0da51978/aria/orchestrator/workflow_runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflow_runner.py 
b/aria/orchestrator/workflow_runner.py
index 8f25cce..32cf02d 100644
--- a/aria/orchestrator/workflow_runner.py
+++ b/aria/orchestrator/workflow_runner.py
@@ -107,6 +107,9 @@ class WorkflowRunner(object):
     def cancel(self):
         self._engine.cancel_execution()
 
+    def force_cancel(self):
+        self._engine.force_cancel_execution()
+
     def _create_execution_model(self, inputs):
         execution = models.Execution(
             created_at=datetime.utcnow(),

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0da51978/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py 
b/aria/orchestrator/workflows/core/engine.py
index 155d0ee..18fabed 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -51,6 +51,11 @@ class Engine(logger.LoggerMixin):
         execute the workflow
         """
         try:
+            import os
+            print os.getpid()
+            print "going to sleep"
+            time.sleep(10)
+            print "woke up from sleep"
             events.start_workflow_signal.send(self._workflow_context)
             while True:
                 cancel = self._is_cancel()
@@ -81,9 +86,11 @@ class Engine(logger.LoggerMixin):
         """
         events.on_cancelling_workflow_signal.send(self._workflow_context)
 
+    def force_cancel_execution(self):
+        events.on_force_cancelled_workflow_signal.send(self._workflow_context)
+
     def _is_cancel(self):
-        return self._workflow_context.execution.status in 
(models.Execution.CANCELLING,
-                                                           
models.Execution.CANCELLED)
+        return self._workflow_context.execution.status in 
models.Execution.CANCEL_STATES
 
     def _executable_tasks(self):
         now = datetime.utcnow()

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0da51978/aria/orchestrator/workflows/core/events_handler.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/events_handler.py 
b/aria/orchestrator/workflows/core/events_handler.py
index 7f61bfa..f4fd6b2 100644
--- a/aria/orchestrator/workflows/core/events_handler.py
+++ b/aria/orchestrator/workflows/core/events_handler.py
@@ -28,6 +28,9 @@ from datetime import (
 from ... import events
 from ... import exceptions
 
+EXECUTION_ALREADY_ENDED_TEMPLATE = "'{workflow_name}' workflow execution 
{status} " \
+                                   "before the cancel request was received'"
+
 
 @events.sent_task_signal.connect
 def _task_sent(task, *args, **kwargs):
@@ -81,6 +84,9 @@ def _task_succeeded(task, *args, **kwargs):
 @events.start_workflow_signal.connect
 def _workflow_started(workflow_context, *args, **kwargs):
     execution = workflow_context.execution
+    # the execution may already be in a cancelling process
+    if execution.status in execution.CANCEL_STATES:
+        return
     execution.status = execution.STARTED
     execution.started_at = datetime.utcnow()
     workflow_context.execution = execution
@@ -98,7 +104,7 @@ def _workflow_failed(workflow_context, exception, *args, 
**kwargs):
 @events.on_success_workflow_signal.connect
 def _workflow_succeeded(workflow_context, *args, **kwargs):
     execution = workflow_context.execution
-    execution.status = execution.TERMINATED
+    execution.status = execution.SUCCEEDED
     execution.ended_at = datetime.utcnow()
     workflow_context.execution = execution
 
@@ -106,22 +112,40 @@ def _workflow_succeeded(workflow_context, *args, 
**kwargs):
 @events.on_cancelled_workflow_signal.connect
 def _workflow_cancelled(workflow_context, *args, **kwargs):
     execution = workflow_context.execution
-    # _workflow_cancelling function may have called this function
-    # already
-    if execution.status == execution.CANCELLED:
+    status = execution.status
+    # _workflow_cancelling function may have called this function already
+    if status == execution.CANCELLED:
         return
-    execution.status = execution.CANCELLED
-    execution.ended_at = datetime.utcnow()
+    # the execution may have already been finished
+    elif status == execution.SUCCEEDED or status == execution.FAILED:
+        _log_execution_already_ended(workflow_context, status)
+    else:
+        execution.status = execution.CANCELLED
+        execution.ended_at = datetime.utcnow()
+        workflow_context.execution = execution
+
+
[email protected]_force_cancelled_workflow_signal.connect
+def _workflow_force_cancelled(workflow_context, *args, **kwargs):
+    execution = workflow_context.execution
+    if execution.status in execution.END_STATES:
+        return
+    execution.status = execution.FORCE_CANCELLED
     workflow_context.execution = execution
 
 
 @events.on_cancelling_workflow_signal.connect
 def _workflow_cancelling(workflow_context, *args, **kwargs):
     execution = workflow_context.execution
-    if execution.status == execution.PENDING:
+    status = execution.status
+    if status == execution.PENDING:
         return _workflow_cancelled(workflow_context=workflow_context)
-    execution.status = execution.CANCELLING
-    workflow_context.execution = execution
+    # the execution may have already been finished
+    elif status == execution.SUCCEEDED or status == execution.FAILED:
+        _log_execution_already_ended(workflow_context, status)
+    else:
+        execution.status = execution.CANCELLING
+        workflow_context.execution = execution
 
 
 def _update_node_state_if_necessary(task, is_transitional=False):
@@ -135,3 +159,9 @@ def _update_node_state_if_necessary(task, 
is_transitional=False):
         if state:
             node.state = state
             task.context.model.node.update(node)
+
+
+def _log_execution_already_ended(workflow_context, status):
+    workflow_context.logger.info(
+        "'{workflow_name}' workflow execution {status} before the cancel 
request"
+        "was received'".format(workflow_name=workflow_context.workflow_name, 
status=status))

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0da51978/tests/modeling/test_models.py
----------------------------------------------------------------------
diff --git a/tests/modeling/test_models.py b/tests/modeling/test_models.py
index d64cdba..2da2154 100644
--- a/tests/modeling/test_models.py
+++ b/tests/modeling/test_models.py
@@ -310,40 +310,40 @@ class TestExecution(object):
                                 Execution.CANCELLED,
                                 Execution.PENDING],
             Execution.STARTED: [Execution.FAILED,
-                                Execution.TERMINATED,
+                                Execution.SUCCEEDED,
                                 Execution.CANCELLED,
                                 Execution.CANCELLING,
                                 Execution.STARTED],
             Execution.CANCELLING: [Execution.FAILED,
-                                   Execution.TERMINATED,
+                                   Execution.SUCCEEDED,
                                    Execution.CANCELLED,
                                    Execution.CANCELLING],
             Execution.FAILED: [Execution.FAILED],
-            Execution.TERMINATED: [Execution.TERMINATED],
+            Execution.SUCCEEDED: [Execution.SUCCEEDED],
             Execution.CANCELLED: [Execution.CANCELLED]
         }
 
         invalid_transitions = {
             Execution.PENDING: [Execution.FAILED,
-                                Execution.TERMINATED,
+                                Execution.SUCCEEDED,
                                 Execution.CANCELLING],
             Execution.STARTED: [Execution.PENDING],
             Execution.CANCELLING: [Execution.PENDING,
                                    Execution.STARTED],
             Execution.FAILED: [Execution.PENDING,
                                Execution.STARTED,
-                               Execution.TERMINATED,
+                               Execution.SUCCEEDED,
                                Execution.CANCELLED,
                                Execution.CANCELLING],
-            Execution.TERMINATED: [Execution.PENDING,
-                                   Execution.STARTED,
-                                   Execution.FAILED,
-                                   Execution.CANCELLED,
-                                   Execution.CANCELLING],
+            Execution.SUCCEEDED: [Execution.PENDING,
+                                  Execution.STARTED,
+                                  Execution.FAILED,
+                                  Execution.CANCELLED,
+                                  Execution.CANCELLING],
             Execution.CANCELLED: [Execution.PENDING,
                                   Execution.STARTED,
                                   Execution.FAILED,
-                                  Execution.TERMINATED,
+                                  Execution.SUCCEEDED,
                                   Execution.CANCELLING],
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0da51978/tests/orchestrator/test_workflow_runner.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/test_workflow_runner.py 
b/tests/orchestrator/test_workflow_runner.py
index 54e940f..7374e50 100644
--- a/tests/orchestrator/test_workflow_runner.py
+++ b/tests/orchestrator/test_workflow_runner.py
@@ -86,7 +86,7 @@ def test_existing_active_executions(request, service, model):
 def test_existing_executions_but_no_active_ones(request, service, model):
     existing_terminated_execution = models.Execution(
         service=service,
-        status=models.Execution.TERMINATED,
+        status=models.Execution.SUCCEEDED,
         workflow_name='uninstall')
     model.execution.put(existing_terminated_execution)
     # no active executions exist, so no error should be raised

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0da51978/tests/orchestrator/workflows/core/test_engine.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_engine.py 
b/tests/orchestrator/workflows/core/test_engine.py
index 1a88f13..af9af17 100644
--- a/tests/orchestrator/workflows/core/test_engine.py
+++ b/tests/orchestrator/workflows/core/test_engine.py
@@ -157,7 +157,7 @@ class TestEngine(BaseTest):
         execution = workflow_context.execution
         assert execution.started_at <= execution.ended_at <= datetime.utcnow()
         assert execution.error is None
-        assert execution.status == models.Execution.TERMINATED
+        assert execution.status == models.Execution.SUCCEEDED
 
     def test_single_task_successful_execution(self, workflow_context, 
executor):
         @workflow

Reply via email to