Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-14-workflow-engine-tests d08040bd6 -> f87d50aa2 (forced 
update)


ARIA-14 Implement initial engine tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/f87d50aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/f87d50aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/f87d50aa

Branch: refs/heads/ARIA-14-workflow-engine-tests
Commit: f87d50aa2b2885782ce51a283609eb7342f9935a
Parents: c0bf347
Author: Dan Kilman <dankil...@gmail.com>
Authored: Tue Nov 1 16:42:34 2016 +0200
Committer: Dan Kilman <dankil...@gmail.com>
Committed: Thu Nov 3 00:25:27 2016 +0200

----------------------------------------------------------------------
 aria/contexts.py                     |   4 +-
 aria/events/__init__.py              |   1 +
 aria/events/builtin_event_handler.py |  15 ++-
 aria/storage/models.py               |   6 +-
 aria/tools/application.py            |  10 +-
 aria/workflows/core/engine.py        |  23 ++--
 aria/workflows/core/tasks.py         |  33 +++--
 tests/.pylintrc                      |   2 +-
 tests/workflows/test_engine.py       | 203 ++++++++++++++++++++++++++++++
 9 files changed, 268 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f87d50aa/aria/contexts.py
----------------------------------------------------------------------
diff --git a/aria/contexts.py b/aria/contexts.py
index ae7fc66..fdd26a2 100644
--- a/aria/contexts.py
+++ b/aria/contexts.py
@@ -201,11 +201,11 @@ class OperationContext(LoggerMixin):
         """
         The model operation
         """
-        return self.storage.operation.get(self.id)
+        return self.model.operation.get(self.id)
 
     @operation.setter
     def operation(self, value):
         """
         Store the operation in the model storage
         """
-        self.storage.operation.store(value)
+        self.model.operation.store(value)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f87d50aa/aria/events/__init__.py
----------------------------------------------------------------------
diff --git a/aria/events/__init__.py b/aria/events/__init__.py
index 6b07213..74f3e22 100644
--- a/aria/events/__init__.py
+++ b/aria/events/__init__.py
@@ -39,6 +39,7 @@ from blinker import signal
 from ..tools.plugin import plugin_installer
 
 # workflow engine task signals:
+sent_task_signal = signal('sent_task_signal')
 start_task_signal = signal('start_task_signal')
 on_success_task_signal = signal('success_task_signal')
 on_failure_task_signal = signal('failure_task_signal')

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f87d50aa/aria/events/builtin_event_handler.py
----------------------------------------------------------------------
diff --git a/aria/events/builtin_event_handler.py 
b/aria/events/builtin_event_handler.py
index ec3238f..2dfbd00 100644
--- a/aria/events/builtin_event_handler.py
+++ b/aria/events/builtin_event_handler.py
@@ -27,12 +27,21 @@ from . import (
     start_workflow_signal,
     on_success_workflow_signal,
     on_failure_workflow_signal,
+    sent_task_signal,
     start_task_signal,
     on_success_task_signal,
     on_failure_task_signal,
 )
 
 
+@sent_task_signal.connect
+def _task_sent(task, *args, **kwargs):
+    operation_context = task.context
+    operation = operation_context.operation
+    operation.status = operation.SENT
+    operation_context.operation = operation
+
+
 @start_task_signal.connect
 def _task_started(task, *args, **kwargs):
     operation_context = task.context
@@ -62,7 +71,7 @@ def _task_succeeded(task, *args, **kwargs):
 
 @start_workflow_signal.connect
 def _workflow_started(workflow_context, *args, **kwargs):
-    execution_cls = workflow_context.storage.execution.model_cls
+    execution_cls = workflow_context.model.execution.model_cls
     execution = execution_cls(
         id=workflow_context.execution_id,
         deployment_id=workflow_context.deployment_id,
@@ -80,7 +89,7 @@ def _workflow_failed(workflow_context, exception, *args, 
**kwargs):
     execution = workflow_context.execution
     execution.error = str(exception)
     execution.status = execution.FAILED
-    execution.ended_at = datetime.utcnow(),
+    execution.ended_at = datetime.utcnow()
     workflow_context.execution = execution
 
 
@@ -88,5 +97,5 @@ def _workflow_failed(workflow_context, exception, *args, 
**kwargs):
 def _workflow_succeeded(workflow_context, *args, **kwargs):
     execution = workflow_context.execution
     execution.status = execution.TERMINATED
-    execution.ended_at = datetime.utcnow(),
+    execution.ended_at = datetime.utcnow()
     workflow_context.execution = execution

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f87d50aa/aria/storage/models.py
----------------------------------------------------------------------
diff --git a/aria/storage/models.py b/aria/storage/models.py
index 32403ed..e5fc1ac 100644
--- a/aria/storage/models.py
+++ b/aria/storage/models.py
@@ -221,11 +221,13 @@ class Operation(Model):
     A Model which represents an operation
     """
     PENDING = 'pending'
+    SENT = 'sent'
     STARTED = 'started'
     SUCCESS = 'success'
     FAILED = 'failed'
     STATES = (
         PENDING,
+        SENT,
         STARTED,
         SUCCESS,
         FAILED,
@@ -233,9 +235,9 @@ class Operation(Model):
     END_STATES = [SUCCESS, FAILED]
 
     id = Field(type=basestring, default=uuid_generator)
-    status = Field(type=basestring, choices=STATES, default=STARTED)
+    status = Field(type=basestring, choices=STATES, default=PENDING)
     execution_id = Field(type=basestring)
-    eta = Field(type=datetime, default=0)
+    eta = Field(type=datetime, default=datetime.now)
     started_at = Field(type=datetime, default=None)
     ended_at = Field(type=datetime, default=None)
     max_retries = Field(type=int, default=0)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f87d50aa/aria/tools/application.py
----------------------------------------------------------------------
diff --git a/aria/tools/application.py b/aria/tools/application.py
index 32feeff..ddc1317 100644
--- a/aria/tools/application.py
+++ b/aria/tools/application.py
@@ -85,11 +85,11 @@ class StorageManager(LoggerMixin):
         Create a StorageManager from a blueprint
         """
         return cls(
-            model_storage,
-            resource_storage,
-            blueprint_path,
-            blueprint_plan,
-            blueprint_id,
+            model_storage=model_storage,
+            resource_storage=resource_storage,
+            blueprint_path=blueprint_path,
+            blueprint_plan=blueprint_plan,
+            blueprint_id=blueprint_id,
             deployment_id=None,
             deployment_plan=None)
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f87d50aa/aria/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/workflows/core/engine.py b/aria/workflows/core/engine.py
index 9c6eff8..5cd4604 100644
--- a/aria/workflows/core/engine.py
+++ b/aria/workflows/core/engine.py
@@ -18,12 +18,15 @@ The workflow engine. Executes workflows
 """
 
 import time
+from datetime import datetime
 
 import networkx
 
 from aria import events, logger
+from aria.storage import models
 
 from . import translation
+from . import tasks
 
 
 class Engine(logger.LoggerMixin):
@@ -34,10 +37,9 @@ class Engine(logger.LoggerMixin):
     def __init__(self, executor, workflow_context, tasks_graph, **kwargs):
         super(Engine, self).__init__(**kwargs)
         self._workflow_context = workflow_context
-        self._tasks_graph = tasks_graph
         self._execution_graph = networkx.DiGraph()
         self._executor = executor
-        translation.build_execution_graph(task_graph=self._tasks_graph,
+        translation.build_execution_graph(task_graph=tasks_graph,
                                           workflow_context=workflow_context,
                                           
execution_graph=self._execution_graph)
 
@@ -62,17 +64,18 @@ class Engine(logger.LoggerMixin):
             raise
 
     def _executable_tasks(self):
-        now = time.time()
+        now = datetime.now()
         return (task for task in self._tasks_iter()
-                if task.status == task.PENDING and
+                if task.status == models.Operation.PENDING and
                 task.eta <= now and
                 not self._task_has_dependencies(task))
 
     def _ended_tasks(self):
-        return (task for task in self._tasks_iter() if task.status in 
task.END_STATES)
+        return (task for task in self._tasks_iter()
+                if task.status in models.Operation.END_STATES)
 
     def _task_has_dependencies(self, task):
-        return len(self._execution_graph.succ.get(task.id, {})) > 0
+        return len(self._execution_graph.pred.get(task.id, {})) > 0
 
     def _all_tasks_consumed(self):
         return len(self._execution_graph.node) == 0
@@ -81,10 +84,14 @@ class Engine(logger.LoggerMixin):
         return (data['task'] for _, data in 
self._execution_graph.nodes_iter(data=True))
 
     def _handle_executable_task(self, task):
-        self._executor.execute(task)
+        if isinstance(task, tasks.BaseWorkflowTask):
+            task.status = models.Operation.SUCCESS
+        else:
+            events.sent_task_signal.send(task)
+            self._executor.execute(task)
 
     def _handle_ended_tasks(self, task):
-        if task.status == task.FAILED:
+        if task.status == models.Operation.FAILED:
             raise RuntimeError('Workflow failed')
         else:
             self._execution_graph.remove_node(task.id)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f87d50aa/aria/workflows/core/tasks.py
----------------------------------------------------------------------
diff --git a/aria/workflows/core/tasks.py b/aria/workflows/core/tasks.py
index 76ae609..98d7c13 100644
--- a/aria/workflows/core/tasks.py
+++ b/aria/workflows/core/tasks.py
@@ -17,13 +17,19 @@
 Workflow tasks
 """
 
+from datetime import datetime
 
-class BaseTask(object):
+from aria import logger
+from aria.storage import models
+
+
+class BaseTask(logger.LoggerMixin):
     """
     Base class for Task objects
     """
 
-    def __init__(self, id, name, context):
+    def __init__(self, id, name, context, *args, **kwargs):
+        super(BaseTask, self).__init__(*args, **kwargs)
         self._id = id
         self._name = name
         self._context = context
@@ -50,28 +56,39 @@ class BaseTask(object):
         return self._context
 
 
-class StartWorkflowTask(BaseTask):
+class BaseWorkflowTask(BaseTask):
+    """
+    Base class for all workflow wrapping tasks
+    """
+
+    def __init__(self, *args, **kwargs):
+        super(BaseWorkflowTask, self).__init__(*args, **kwargs)
+        self.status = models.Operation.PENDING
+        self.eta = datetime.now()
+
+
+class StartWorkflowTask(BaseWorkflowTask):
     """
     Tasks marking a workflow start
     """
     pass
 
 
-class EndWorkflowTask(BaseTask):
+class EndWorkflowTask(BaseWorkflowTask):
     """
     Tasks marking a workflow end
     """
     pass
 
 
-class StartSubWorkflowTask(BaseTask):
+class StartSubWorkflowTask(BaseWorkflowTask):
     """
     Tasks marking a subworkflow start
     """
     pass
 
 
-class EndSubWorkflowTask(BaseTask):
+class EndSubWorkflowTask(BaseWorkflowTask):
     """
     Tasks marking a subworkflow end
     """
@@ -88,7 +105,7 @@ class OperationTask(BaseTask):
         self._create_operation_in_storage()
 
     def _create_operation_in_storage(self):
-        operation_cls = self.context.storage.operation.model_cls
+        operation_cls = self.context.model.operation.model_cls
         operation = operation_cls(
             id=self.context.id,
             execution_id=self.context.execution_id,
@@ -99,6 +116,6 @@ class OperationTask(BaseTask):
 
     def __getattr__(self, attr):
         try:
-            return getattr(self.context, attr)
+            return getattr(self.context.operation, attr)
         except AttributeError:
             return super(OperationTask, self).__getattribute__(attr)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f87d50aa/tests/.pylintrc
----------------------------------------------------------------------
diff --git a/tests/.pylintrc b/tests/.pylintrc
index f6cfd7a..0f84473 100644
--- a/tests/.pylintrc
+++ b/tests/.pylintrc
@@ -62,7 +62,7 @@ confidence=
 # --enable=similarities". If you want to run only the classes checker, but have
 # no Warning level messages displayed, use"--disable=all --enable=classes
 # --disable=W"
-disable=import-star-module-level,old-octal-literal,oct-method,print-statement,unpacking-in-except,parameter-unpacking,backtick,old-raise-syntax,old-ne-operator,long-suffix,dict-view-method,dict-iter-method,metaclass-assignment,next-method-called,raising-string,indexing-exception,raw_input-builtin,long-builtin,file-builtin,execfile-builtin,coerce-builtin,cmp-builtin,buffer-builtin,basestring-builtin,apply-builtin,filter-builtin-not-iterating,using-cmp-argument,useless-suppression,range-builtin-not-iterating,suppressed-message,no-absolute-import,old-division,cmp-method,reload-builtin,zip-builtin-not-iterating,intern-builtin,unichr-builtin,reduce-builtin,standarderror-builtin,unicode-builtin,xrange-builtin,coerce-method,delslice-method,getslice-method,setslice-method,input-builtin,round-builtin,hex-method,nonzero-method,map-builtin-not-iterating,redefined-builtin,no-self-use,missing-docstring,attribute-defined-outside-init
+disable=import-star-module-level,old-octal-literal,oct-method,print-statement,unpacking-in-except,parameter-unpacking,backtick,old-raise-syntax,old-ne-operator,long-suffix,dict-view-method,dict-iter-method,metaclass-assignment,next-method-called,raising-string,indexing-exception,raw_input-builtin,long-builtin,file-builtin,execfile-builtin,coerce-builtin,cmp-builtin,buffer-builtin,basestring-builtin,apply-builtin,filter-builtin-not-iterating,using-cmp-argument,useless-suppression,range-builtin-not-iterating,suppressed-message,no-absolute-import,old-division,cmp-method,reload-builtin,zip-builtin-not-iterating,intern-builtin,unichr-builtin,reduce-builtin,standarderror-builtin,unicode-builtin,xrange-builtin,coerce-method,delslice-method,getslice-method,setslice-method,input-builtin,round-builtin,hex-method,nonzero-method,map-builtin-not-iterating,redefined-builtin,no-self-use,missing-docstring,attribute-defined-outside-init,too-many-locals
 
 [REPORTS]
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f87d50aa/tests/workflows/test_engine.py
----------------------------------------------------------------------
diff --git a/tests/workflows/test_engine.py b/tests/workflows/test_engine.py
new file mode 100644
index 0000000..f491597
--- /dev/null
+++ b/tests/workflows/test_engine.py
@@ -0,0 +1,203 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+import aria
+from aria import events
+from aria import workflow
+from aria import contexts
+from aria.tools import application
+from aria.storage import drivers
+from aria.workflows.executor import thread
+from aria.workflows.core import engine
+
+
+global_test_holder = {}
+
+
+class TestEngine(object):
+
+    def test_empty_graph_execution(self, workflow_context, executor):
+        @workflow
+        def mock_workflow(**_):
+            pass
+        self._execute(workflow_func=mock_workflow,
+                      workflow_context=workflow_context,
+                      executor=executor)
+        assert workflow_context.states == ['start', 'success']
+        assert workflow_context.exception is None
+        assert 'sent_task_signal_calls' not in global_test_holder
+
+    def test_single_task_successful_execution(self, workflow_context, 
executor):
+        @workflow
+        def mock_workflow(context, graph):
+            graph.add_task(self._op(mock_success_task, context))
+        self._execute(
+            workflow_func=mock_workflow,
+            workflow_context=workflow_context,
+            executor=executor)
+        assert workflow_context.states == ['start', 'success']
+        assert workflow_context.exception is None
+        assert global_test_holder.get('sent_task_signal_calls') == 1
+
+    def test_single_task_failed_execution(self, workflow_context, executor):
+        @workflow
+        def mock_workflow(context, graph):
+            graph.add_task(self._op(mock_failed_task, context))
+        with pytest.raises(RuntimeError):
+            self._execute(
+                workflow_func=mock_workflow,
+                workflow_context=workflow_context,
+                executor=executor)
+        assert workflow_context.states == ['start', 'failure']
+        assert isinstance(workflow_context.exception, RuntimeError)
+        assert global_test_holder.get('sent_task_signal_calls') == 1
+
+    def test_two_tasks_execution_order(self, workflow_context, executor):
+        @workflow
+        def mock_workflow(context, graph):
+            op1 = self._op(mock_ordered_task, context, inputs={'counter': 1})
+            op2 = self._op(mock_ordered_task, context, inputs={'counter': 2})
+            graph.chain(tasks=[op1, op2])
+        self._execute(
+            workflow_func=mock_workflow,
+            workflow_context=workflow_context,
+            executor=executor)
+        assert workflow_context.states == ['start', 'success']
+        assert workflow_context.exception is None
+        assert global_test_holder.get('invocations') == [1, 2]
+        assert global_test_holder.get('sent_task_signal_calls') == 2
+
+    @staticmethod
+    def _execute(workflow_func, workflow_context, executor):
+        graph = workflow_func(context=workflow_context)
+        eng = engine.Engine(executor=executor,
+                            workflow_context=workflow_context,
+                            tasks_graph=graph)
+        eng.execute()
+
+    @staticmethod
+    def _op(function, context, inputs=None):
+        return context.operation(
+            name='task',
+            node_instance=None,
+            operation_details={'operation': 
'tests.workflows.test_engine.{name}'.format(
+                name=function.__name__)},
+            inputs=inputs
+        )
+
+    @pytest.fixture(scope='function', autouse=True)
+    def globals_cleanup(self):
+        try:
+            yield
+        finally:
+            global_test_holder.clear()
+
+    @pytest.fixture(scope='function', autouse=True)
+    def signals_registration(self, ):
+        def sent_task_handler(*args, **kwargs):
+            calls = global_test_holder.setdefault('sent_task_signal_calls', 0)
+            global_test_holder['sent_task_signal_calls'] = calls + 1
+
+        def start_workflow_handler(workflow_context, *args, **kwargs):
+            workflow_context.states.append('start')
+
+        def success_workflow_handler(workflow_context, *args, **kwargs):
+            workflow_context.states.append('success')
+
+        def failure_workflow_handler(workflow_context, exception, *args, 
**kwargs):
+            workflow_context.states.append('failure')
+            workflow_context.exception = exception
+
+        events.start_workflow_signal.connect(start_workflow_handler)
+        events.on_success_workflow_signal.connect(success_workflow_handler)
+        events.on_failure_workflow_signal.connect(failure_workflow_handler)
+        events.sent_task_signal.connect(sent_task_handler)
+        try:
+            yield
+        finally:
+            events.start_workflow_signal.disconnect(start_workflow_handler)
+            
events.on_success_workflow_signal.disconnect(success_workflow_handler)
+            
events.on_failure_workflow_signal.disconnect(failure_workflow_handler)
+            events.sent_task_signal.disconnect(sent_task_handler)
+
+    @pytest.fixture(scope='function')
+    def executor(self):
+        result = thread.ThreadExecutor()
+        try:
+            yield result
+        finally:
+            result.close()
+
+    @pytest.fixture(scope='function')
+    def workflow_context(self, tmpdir):
+        from dsl_parser.parser import parse_from_path
+        from dsl_parser.tasks import prepare_deployment_plan
+        blueprint = 'tosca_definitions_version: 
cloudify_dsl_1_3\nnode_templates: {}'
+        blueprint_dir = tmpdir.mkdir('blueprint')
+        blueprint_path = blueprint_dir.join('blueprint.yaml')
+        blueprint_path.write(blueprint)
+        blueprint_plan = parse_from_path(str(blueprint_path))
+        blueprint_id = 'b1'
+        deployment_plan = prepare_deployment_plan(blueprint_plan.copy())
+        deployment_id = 'd1'
+        work_dir = tmpdir.mkdir('work')
+        storage_dir = work_dir.mkdir('storage')
+        model_storage_dir = storage_dir.mkdir('model')
+        resource_storage_dir = storage_dir.mkdir('resource')
+        model_storage = aria.application_model_storage(
+            drivers.FileSystemModelDriver(str(model_storage_dir)))
+        resource_storage = aria.application_resource_storage(
+            drivers.FileSystemResourceDriver(str(resource_storage_dir)))
+        resource_storage.setup()
+        model_storage.setup()
+        storage_manager = application.StorageManager(
+            model_storage=model_storage,
+            resource_storage=resource_storage,
+            blueprint_path=blueprint_path,
+            blueprint_id=blueprint_id,
+            blueprint_plan=blueprint_plan,
+            deployment_id=deployment_id,
+            deployment_plan=deployment_plan
+        )
+        storage_manager.create_blueprint_storage(
+            source=str(blueprint_path),
+            main_file_name='blueprint.yaml')
+        storage_manager.create_nodes_storage()
+        storage_manager.create_deployment_storage()
+        storage_manager.create_node_instances_storage()
+        result = contexts.WorkflowContext(
+            name='test',
+            model_storage=model_storage,
+            resource_storage=resource_storage,
+            deployment_id=deployment_id,
+            workflow_id='name')
+        result.states = []
+        result.exception = None
+        return result
+
+
+def mock_success_task():
+    pass
+
+
+def mock_failed_task():
+    raise RuntimeError
+
+
+def mock_ordered_task(counter):
+    invocations = global_test_holder.setdefault('invocations', [])
+    invocations.append(counter)

Reply via email to