Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-11-wf-cancel [created] a4b7894a5


ARIA-11 Add cancel workflow execution support


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/a4b7894a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/a4b7894a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/a4b7894a

Branch: refs/heads/ARIA-11-wf-cancel
Commit: a4b7894a5dc84d3b35db9d5bbf7787533eaa2f54
Parents: 9a44178
Author: Dan Kilman <[email protected]>
Authored: Wed Nov 9 16:37:43 2016 +0200
Committer: Dan Kilman <[email protected]>
Committed: Wed Nov 9 18:48:59 2016 +0200

----------------------------------------------------------------------
 aria/context/workflow.py                     | 14 +++++
 aria/events/__init__.py                      |  2 +
 aria/events/builtin_event_handler.py         | 34 +++++++----
 aria/events/workflow_engine_event_handler.py | 14 ++++-
 aria/storage/models.py                       |  4 +-
 aria/workflows/core/engine.py                | 21 ++++++-
 tests/context/__init__.py                    | 14 +++++
 tests/context/test_workflow.py               | 59 +++++++++++++++++++
 tests/mock/context.py                        |  1 +
 tests/workflows/core/test_engine.py          | 71 +++++++++++++++++++++--
 10 files changed, 213 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a4b7894a/aria/context/workflow.py
----------------------------------------------------------------------
diff --git a/aria/context/workflow.py b/aria/context/workflow.py
index 44db38d..9c5abdc 100644
--- a/aria/context/workflow.py
+++ b/aria/context/workflow.py
@@ -62,6 +62,20 @@ class WorkflowContext(logger.LoggerMixin):
         self.task_max_retries = task_max_retries
         self.task_retry_interval = task_retry_interval
 
+        try:
+            self.model.execution.get(self.execution_id)
+        except exceptions.StorageError:
+            execution_cls = self.model.execution.model_cls
+            execution = self.model.execution.model_cls(
+                id=self.execution_id,
+                deployment_id=self.deployment_id,
+                workflow_id=self.workflow_id,
+                blueprint_id=self.blueprint_id,
+                status=execution_cls.PENDING,
+                parameters=self.parameters,
+            )
+            self.model.execution.store(execution)
+
     def __repr__(self):
         return (
             '{name}(deployment_id={self.deployment_id}, '

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a4b7894a/aria/events/__init__.py
----------------------------------------------------------------------
diff --git a/aria/events/__init__.py b/aria/events/__init__.py
index 74f3e22..a158308 100644
--- a/aria/events/__init__.py
+++ b/aria/events/__init__.py
@@ -46,6 +46,8 @@ on_failure_task_signal = signal('failure_task_signal')
 
 # workflow engine workflow signals:
 start_workflow_signal = signal('start_workflow_signal')
+on_cancelling_workflow_signal = signal('on_cancelling_workflow_signal')
+on_cancel_workflow_signal = signal('on_cancel_workflow_signal')
 on_success_workflow_signal = signal('on_success_workflow_signal')
 on_failure_workflow_signal = signal('on_failure_workflow_signal')
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a4b7894a/aria/events/builtin_event_handler.py
----------------------------------------------------------------------
diff --git a/aria/events/builtin_event_handler.py 
b/aria/events/builtin_event_handler.py
index b54024b..11ee744 100644
--- a/aria/events/builtin_event_handler.py
+++ b/aria/events/builtin_event_handler.py
@@ -30,6 +30,8 @@ from . import (
     start_workflow_signal,
     on_success_workflow_signal,
     on_failure_workflow_signal,
+    on_cancel_workflow_signal,
+    on_cancelling_workflow_signal,
     sent_task_signal,
     start_task_signal,
     on_success_task_signal,
@@ -71,16 +73,9 @@ def _task_succeeded(task, *args, **kwargs):
 
 @start_workflow_signal.connect
 def _workflow_started(workflow_context, *args, **kwargs):
-    execution_cls = workflow_context.model.execution.model_cls
-    execution = execution_cls(
-        id=workflow_context.execution_id,
-        deployment_id=workflow_context.deployment_id,
-        workflow_id=workflow_context.workflow_id,
-        blueprint_id=workflow_context.blueprint_id,
-        status=execution_cls.PENDING,
-        started_at=datetime.utcnow(),
-        parameters=workflow_context.parameters,
-    )
+    execution = workflow_context.execution
+    execution.status = execution.STARTED
+    execution.started_at = datetime.utcnow()
     workflow_context.execution = execution
 
 
@@ -99,3 +94,22 @@ def _workflow_succeeded(workflow_context, *args, **kwargs):
     execution.status = execution.TERMINATED
     execution.ended_at = datetime.utcnow()
     workflow_context.execution = execution
+
+
+@on_cancel_workflow_signal.connect
+def _workflow_cancelled(workflow_context, *args, **kwargs):
+    execution = workflow_context.execution
+    execution.status = execution.CANCELLED
+    execution.ended_at = datetime.utcnow()
+    workflow_context.execution = execution
+
+
+@on_cancelling_workflow_signal.connect
+def _workflow_cancelling(workflow_context, *args, **kwargs):
+    execution = workflow_context.execution
+    # TODO: handle concurrency, locks may not be enough as this may
+    # be modified somewhere else entirely
+    if execution.status == execution.PENDING:
+        return _workflow_cancelled(workflow_context=workflow_context)
+    execution.status = execution.CANCELLING
+    workflow_context.execution = execution

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a4b7894a/aria/events/workflow_engine_event_handler.py
----------------------------------------------------------------------
diff --git a/aria/events/workflow_engine_event_handler.py 
b/aria/events/workflow_engine_event_handler.py
index 2f74ded..4067674 100644
--- a/aria/events/workflow_engine_event_handler.py
+++ b/aria/events/workflow_engine_event_handler.py
@@ -27,7 +27,9 @@ from . import (
     on_failure_task_signal,
     start_workflow_signal,
     on_success_workflow_signal,
-    on_failure_workflow_signal
+    on_failure_workflow_signal,
+    on_cancel_workflow_signal,
+    on_cancelling_workflow_signal,
 )
 
 
@@ -60,3 +62,13 @@ def _failure_workflow_handler(context, **kwargs):
 @on_success_workflow_signal.connect
 def _success_workflow_handler(context, **kwargs):
     context.logger.debug('Event: Workflow success: 
{context.name}'.format(context=context))
+
+
+@on_cancel_workflow_signal.connect
+def _cancel_workflow_handler(context, **kwargs):
+    context.logger.debug('Event: Workflow cancelled: 
{context.name}'.format(context=context))
+
+
+@on_cancelling_workflow_signal.connect
+def _cancelling_workflow_handler(context, **kwargs):
+    context.logger.debug('Event: Workflow cancelling: 
{context.name}'.format(context=context))

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a4b7894a/aria/storage/models.py
----------------------------------------------------------------------
diff --git a/aria/storage/models.py b/aria/storage/models.py
index a3686de..eb943aa 100644
--- a/aria/storage/models.py
+++ b/aria/storage/models.py
@@ -191,7 +191,6 @@ class Execution(Model):
     PENDING = 'pending'
     STARTED = 'started'
     CANCELLING = 'cancelling'
-    FORCE_CANCELLING = 'force_cancelling'
     STATES = (
         TERMINATED,
         FAILED,
@@ -199,7 +198,6 @@ class Execution(Model):
         PENDING,
         STARTED,
         CANCELLING,
-        FORCE_CANCELLING,
     )
     END_STATES = [TERMINATED, FAILED, CANCELLED]
     ACTIVE_STATES = [state for state in STATES if state not in END_STATES]
@@ -209,7 +207,7 @@ class Execution(Model):
     deployment_id = Field(type=basestring)
     workflow_id = Field(type=basestring)
     blueprint_id = Field(type=basestring)
-    started_at = Field(type=datetime)
+    started_at = Field(type=datetime, default=None)
     ended_at = Field(type=datetime, default=None)
     error = Field(type=basestring, default=None)
     parameters = Field()

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a4b7894a/aria/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/workflows/core/engine.py b/aria/workflows/core/engine.py
index 3ed137c..c4a049c 100644
--- a/aria/workflows/core/engine.py
+++ b/aria/workflows/core/engine.py
@@ -48,7 +48,7 @@ class Engine(logger.LoggerMixin):
         """
         try:
             events.start_workflow_signal.send(self._workflow_context)
-            while True:
+            while not self._is_cancelling():
                 for task in self._ended_tasks():
                     self._handle_ended_tasks(task)
                 for task in self._executable_tasks():
@@ -57,11 +57,28 @@ class Engine(logger.LoggerMixin):
                     break
                 else:
                     time.sleep(0.1)
-            events.on_success_workflow_signal.send(self._workflow_context)
+            if self._is_cancelling():
+                events.on_cancel_workflow_signal.send(self._workflow_context)
+            else:
+                events.on_success_workflow_signal.send(self._workflow_context)
         except BaseException as e:
             events.on_failure_workflow_signal.send(self._workflow_context, 
exception=e)
             raise
 
+    def cancel_request(self):
+        """
+        Send a cancel request to the engine. If execution already started, 
execution status
+        will be modified to 'cancelling' status. If execution is in pending 
mode, execution status
+        will be modified to 'cancelled' directly. If execution is in one if 
its ended state, an
+        AriaEngineError will be raised
+        """
+        if self._workflow_context.execution.status not in 
models.Execution.ACTIVE_STATES:
+            raise exceptions.AriaEngineError('Execution already ended')
+        events.on_cancelling_workflow_signal.send(self._workflow_context)
+
+    def _is_cancelling(self):
+        return self._workflow_context.execution.status == 
models.Execution.CANCELLING
+
     def _executable_tasks(self):
         now = datetime.utcnow()
         return (task for task in self._tasks_iter()

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a4b7894a/tests/context/__init__.py
----------------------------------------------------------------------
diff --git a/tests/context/__init__.py b/tests/context/__init__.py
new file mode 100644
index 0000000..ae1e83e
--- /dev/null
+++ b/tests/context/__init__.py
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a4b7894a/tests/context/test_workflow.py
----------------------------------------------------------------------
diff --git a/tests/context/test_workflow.py b/tests/context/test_workflow.py
new file mode 100644
index 0000000..2e19aa2
--- /dev/null
+++ b/tests/context/test_workflow.py
@@ -0,0 +1,59 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+from aria import context, application_model_storage
+
+from ..mock import models
+from ..storage import InMemoryModelDriver
+
+
+class TestWorkflowContext(object):
+
+    def test_execution_creation_on_workflow_context_creation(self, storage):
+        self._create_ctx(storage)
+        execution = storage.execution.get(models.EXECUTION_ID)
+        assert execution.id == models.EXECUTION_ID
+        assert execution.deployment_id == models.DEPLOYMENT_ID
+        assert execution.workflow_id == models.WORKFLOW_ID
+        assert execution.blueprint_id == models.BLUEPRINT_ID
+        assert execution.status == storage.execution.model_cls.PENDING
+        assert execution.parameters == {}
+
+    def test_subsequent_workflow_context_creation_do_not_fail(self, storage):
+        self._create_ctx(storage)
+        self._create_ctx(storage)
+
+    @staticmethod
+    def _create_ctx(storage):
+        return context.workflow.WorkflowContext(
+            name='simple_context',
+            model_storage=storage,
+            resource_storage=None,
+            deployment_id=models.DEPLOYMENT_ID,
+            workflow_id=models.WORKFLOW_ID,
+            execution_id=models.EXECUTION_ID,
+            task_max_retries=models.TASK_MAX_RETRIES,
+            task_retry_interval=models.TASK_RETRY_INTERVAL
+        )
+
+
[email protected](scope='function')
+def storage():
+    result = application_model_storage(InMemoryModelDriver())
+    result.setup()
+    result.deployment.store(models.get_deployment())
+    return result

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a4b7894a/tests/mock/context.py
----------------------------------------------------------------------
diff --git a/tests/mock/context.py b/tests/mock/context.py
index 13020f3..bef2437 100644
--- a/tests/mock/context.py
+++ b/tests/mock/context.py
@@ -22,6 +22,7 @@ from ..storage import InMemoryModelDriver
 def simple():
     storage = application_model_storage(InMemoryModelDriver())
     storage.setup()
+    storage.deployment.store(models.get_deployment())
     return context.workflow.WorkflowContext(
         name='simple_context',
         model_storage=storage,

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a4b7894a/tests/workflows/core/test_engine.py
----------------------------------------------------------------------
diff --git a/tests/workflows/core/test_engine.py 
b/tests/workflows/core/test_engine.py
index d0d41f3..748281e 100644
--- a/tests/workflows/core/test_engine.py
+++ b/tests/workflows/core/test_engine.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 import time
+import threading
 from datetime import datetime
 
 import pytest
@@ -37,11 +38,20 @@ global_test_holder = {}
 
 class BaseTest(object):
 
+    @classmethod
+    def _execute(cls, workflow_func, workflow_context, executor):
+        eng = cls._engine(workflow_func=workflow_func,
+                          workflow_context=workflow_context,
+                          executor=executor)
+        eng.execute()
+        return eng
+
     @staticmethod
-    def _execute(workflow_func, workflow_context, executor):
+    def _engine(workflow_func, workflow_context, executor):
         graph = workflow_func(ctx=workflow_context)
-        eng = engine.Engine(executor=executor, 
workflow_context=workflow_context, tasks_graph=graph)
-        eng.execute()
+        return engine.Engine(executor=executor,
+                             workflow_context=workflow_context,
+                             tasks_graph=graph)
 
     @staticmethod
     def _op(func, ctx, inputs=None, max_retries=None, retry_interval=None):
@@ -78,9 +88,13 @@ class BaseTest(object):
             workflow_context.states.append('failure')
             workflow_context.exception = exception
 
+        def cancel_workflow_handler(workflow_context, *args, **kwargs):
+            workflow_context.states.append('cancel')
+
         events.start_workflow_signal.connect(start_workflow_handler)
         events.on_success_workflow_signal.connect(success_workflow_handler)
         events.on_failure_workflow_signal.connect(failure_workflow_handler)
+        events.on_cancel_workflow_signal.connect(cancel_workflow_handler)
         events.sent_task_signal.connect(sent_task_handler)
         try:
             yield
@@ -88,6 +102,7 @@ class BaseTest(object):
             events.start_workflow_signal.disconnect(start_workflow_handler)
             
events.on_success_workflow_signal.disconnect(success_workflow_handler)
             
events.on_failure_workflow_signal.disconnect(failure_workflow_handler)
+            
events.on_cancel_workflow_signal.disconnect(cancel_workflow_handler)
             events.sent_task_signal.disconnect(sent_task_handler)
 
     @pytest.fixture(scope='function')
@@ -189,16 +204,56 @@ class TestEngine(BaseTest):
         @workflow
         def mock_workflow(ctx, graph):
             graph.add_tasks(api.task.WorkflowTask(sub_workflow, ctx=ctx))
-
         self._execute(workflow_func=mock_workflow,
                       workflow_context=workflow_context,
                       executor=executor)
-
         assert workflow_context.states == ['start', 'success']
         assert workflow_context.exception is None
         assert global_test_holder.get('invocations') == [1, 2]
         assert global_test_holder.get('sent_task_signal_calls') == 2
 
+    def test_cancel_started_execution(self, workflow_context, executor):
+        number_of_tasks = 100
+
+        @workflow
+        def mock_workflow(ctx, graph):
+            return graph.sequence(*(self._op(mock_sleep_task, ctx, 
inputs={'seconds': 0.1})
+                                    for _ in range(number_of_tasks)))
+        eng = self._engine(workflow_func=mock_workflow,
+                           workflow_context=workflow_context,
+                           executor=executor)
+        t = threading.Thread(target=eng.execute)
+        t.start()
+        time.sleep(1)
+        eng.cancel_request()
+        t.join(timeout=30)
+        assert workflow_context.states == ['start', 'cancel']
+        assert workflow_context.exception is None
+        invocations = global_test_holder.get('invocations', [])
+        assert 0 < len(invocations) < number_of_tasks
+
+    def test_invalid_cancel_ended_execution(self, workflow_context, executor):
+        @workflow
+        def mock_workflow(**_):
+            pass
+        eng = self._execute(workflow_func=mock_workflow,
+                            workflow_context=workflow_context,
+                            executor=executor)
+        with pytest.raises(exceptions.AriaEngineError):
+            eng.cancel_request()
+
+    def test_cancel_pending_execution(self, workflow_context, executor):
+        @workflow
+        def mock_workflow(graph, **_):
+            return graph
+        eng = self._engine(workflow_func=mock_workflow,
+                           workflow_context=workflow_context,
+                           executor=executor)
+        eng.cancel_request()
+        # sanity to verify previous cancel request actually did something
+        with pytest.raises(exceptions.AriaEngineError):
+            eng.cancel_request()
+
 
 class TestRetries(BaseTest):
 
@@ -334,3 +389,9 @@ def mock_conditional_failure_task(failure_count):
             raise RuntimeError
     finally:
         invocations.append(time.time())
+
+
+def mock_sleep_task(seconds):
+    invocations = global_test_holder.setdefault('invocations', [])
+    invocations.append(time.time())
+    time.sleep(seconds)

Reply via email to