Repository: incubator-ariatosca
Updated Branches:
  refs/heads/stub_task_branch 9bdc64537 -> 55366e64b (forced update)


wip2


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/55366e64
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/55366e64
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/55366e64

Branch: refs/heads/stub_task_branch
Commit: 55366e64b69795363e897a6bd185f12ec14990ea
Parents: 282fcbf
Author: max-orlov <ma...@gigaspaces.com>
Authored: Sun Apr 30 19:54:12 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Sun Apr 30 20:11:04 2017 +0300

----------------------------------------------------------------------
 aria/orchestrator/events.py                     |  8 ----
 aria/orchestrator/workflows/api/task.py         | 34 ++++++++-------
 aria/orchestrator/workflows/builtin/utils.py    | 38 ++++++++--------
 aria/orchestrator/workflows/core/engine.py      | 10 ++---
 .../workflows/core/events_handler.py            |  3 ++
 aria/orchestrator/workflows/core/task.py        | 46 +++++++++++++-------
 aria/orchestrator/workflows/core/translation.py | 13 +++---
 aria/orchestrator/workflows/executor/dry.py     | 32 +++++---------
 tests/end2end/test_hello_world.py               |  5 +--
 .../test_task_graph_into_execution_graph.py     |  4 +-
 10 files changed, 98 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/55366e64/aria/orchestrator/events.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/events.py b/aria/orchestrator/events.py
index 812040b..a1c4922 100644
--- a/aria/orchestrator/events.py
+++ b/aria/orchestrator/events.py
@@ -28,14 +28,6 @@ start_task_signal = signal('start_task_signal')
 on_success_task_signal = signal('success_task_signal')
 on_failure_task_signal = signal('failure_task_signal')
 
-# node state signals:
-# Note that each signal corresponds with a task. The basic start_task_signal 
also changes the state
-# of the node on which it runs. (so does the on_success_task_signal and the 
on_failure_task_signal)
-start_node_signal = signal('start_task_signal')
-on_success_node_signal = signal('success_task_signal')
-on_failure_node_signal = signal('failure_task_signal')
-
-
 # workflow engine workflow signals:
 start_workflow_signal = signal('start_workflow_signal')
 on_cancelling_workflow_signal = signal('on_cancelling_workflow_signal')

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/55366e64/aria/orchestrator/workflows/api/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/api/task.py 
b/aria/orchestrator/workflows/api/task.py
index 82c40c3..15397c3 100644
--- a/aria/orchestrator/workflows/api/task.py
+++ b/aria/orchestrator/workflows/api/task.py
@@ -66,7 +66,8 @@ class OperationTask(BaseTask):
                  inputs=None,
                  max_attempts=None,
                  retry_interval=None,
-                 ignore_failure=None):
+                 ignore_failure=None,
+                 is_stub=False):
         """
         Do not call this constructor directly. Instead, use :meth:`for_node` or
         :meth:`for_relationship`.
@@ -87,15 +88,18 @@ class OperationTask(BaseTask):
                                if ignore_failure is None else ignore_failure)
         self.interface_name = interface_name
         self.operation_name = operation_name
+        self.name = OperationTask.NAME_FORMAT.format(type=actor_type,
+                                                     name=actor.name,
+                                                     
interface=self.interface_name,
+                                                     
operation=self.operation_name)
+        self.is_stub = is_stub
+        if self.is_stub:
+            return
 
         operation = 
self.actor.interfaces[self.interface_name].operations[self.operation_name]
         self.plugin = operation.plugin
         self.inputs = modeling_utils.create_inputs(inputs or {}, 
operation.inputs)
         self.implementation = operation.implementation
-        self.name = OperationTask.NAME_FORMAT.format(type=actor_type,
-                                                     name=actor.name,
-                                                     
interface=self.interface_name,
-                                                     
operation=self.operation_name)
 
     def __repr__(self):
         return self.name
@@ -108,7 +112,8 @@ class OperationTask(BaseTask):
                  max_attempts=None,
                  retry_interval=None,
                  ignore_failure=None,
-                 inputs=None):
+                 inputs=None,
+                 is_stub=False):
         """
         Creates an operation on a node.
 
@@ -132,7 +137,9 @@ class OperationTask(BaseTask):
             max_attempts=max_attempts,
             retry_interval=retry_interval,
             ignore_failure=ignore_failure,
-            inputs=inputs)
+            inputs=inputs,
+            is_stub=is_stub
+        )
 
     @classmethod
     def for_relationship(cls,
@@ -142,7 +149,8 @@ class OperationTask(BaseTask):
                          max_attempts=None,
                          retry_interval=None,
                          ignore_failure=None,
-                         inputs=None):
+                         inputs=None,
+                         is_stub=False):
         """
         Creates an operation on a relationship edge.
 
@@ -166,7 +174,9 @@ class OperationTask(BaseTask):
             max_attempts=max_attempts,
             retry_interval=retry_interval,
             ignore_failure=ignore_failure,
-            inputs=inputs)
+            inputs=inputs,
+            is_stub=is_stub
+        )
 
 
 class WorkflowTask(BaseTask):
@@ -197,9 +207,3 @@ class WorkflowTask(BaseTask):
             return getattr(self._graph, item)
         except AttributeError:
             return super(WorkflowTask, self).__getattribute__(item)
-
-
-class StubTask(BaseTask):
-    """
-    Enables creating empty tasks.
-    """

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/55366e64/aria/orchestrator/workflows/builtin/utils.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/builtin/utils.py 
b/aria/orchestrator/workflows/builtin/utils.py
index 2254d13..e649006 100644
--- a/aria/orchestrator/workflows/builtin/utils.py
+++ b/aria/orchestrator/workflows/builtin/utils.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from ..api.task import OperationTask, StubTask
+from ..api.task import OperationTask
 from .. import exceptions
 
 
@@ -23,12 +23,10 @@ def create_node_task(node, interface_name, operation_name, 
**kwargs):
     """
 
     try:
-        if _is_empty_task(node, interface_name, operation_name):
-            return StubTask()
-
         return OperationTask.for_node(node=node,
                                       interface_name=interface_name,
                                       operation_name=operation_name,
+                                      is_stub=_is_empty_task(node, 
interface_name, operation_name),
                                       **kwargs)
     except exceptions.OperationNotFoundException:
         # We will skip nodes which do not have the operation
@@ -71,29 +69,29 @@ def relationship_tasks(relationship, interface_name, 
source_operation_name=None,
     operations = []
     if source_operation_name:
         try:
-            if _is_empty_task(relationship, interface_name, 
source_operation_name):
-                operations.append(StubTask())
-            else:
-                operations.append(
-                    OperationTask.for_relationship(relationship=relationship,
-                                                   
interface_name=interface_name,
-                                                   
operation_name=source_operation_name,
-                                                   **kwargs)
+            operations.append(
+                OperationTask.for_relationship(
+                    relationship=relationship,
+                    interface_name=interface_name,
+                    operation_name=source_operation_name,
+                    is_stub=_is_empty_task(relationship, interface_name, 
source_operation_name),
+                    **kwargs
                 )
+            )
         except exceptions.OperationNotFoundException:
             # We will skip relationships which do not have the operation
             pass
     if target_operation_name:
         try:
-            if _is_empty_task(relationship, interface_name, 
target_operation_name):
-                operations.append(StubTask())
-            else:
-                operations.append(
-                    OperationTask.for_relationship(relationship=relationship,
-                                                   
interface_name=interface_name,
-                                                   
operation_name=target_operation_name,
-                                                   **kwargs)
+            operations.append(
+                OperationTask.for_relationship(
+                    relationship=relationship,
+                    interface_name=interface_name,
+                    operation_name=target_operation_name,
+                    is_stub=_is_empty_task(relationship, interface_name, 
target_operation_name),
+                    **kwargs
                 )
+            )
         except exceptions.OperationNotFoundException:
             # We will skip relationships which do not have the operation
             pass

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/55366e64/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py 
b/aria/orchestrator/workflows/core/engine.py
index 97c4999..51531f1 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -110,13 +110,11 @@ class Engine(logger.LoggerMixin):
                     self._workflow_context.model.task.refresh(task.model_task)
             yield task
 
-    def _handle_executable_task(self, task):
-        # import pydevd; pydevd.settrace('localhost', suspend=False)
-        if isinstance(task, engine_task.StubTask):
-            task.status = models.Task.SUCCESS
-        else:
+    @staticmethod
+    def _handle_executable_task(task):
+        if not isinstance(task, engine_task.MarkerTaskBase):
             events.sent_task_signal.send(task)
-            task.execute()
+        task.execute()
 
     def _handle_ended_tasks(self, task):
         if task.status == models.Task.FAILED and not task.ignore_failure:

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/55366e64/aria/orchestrator/workflows/core/events_handler.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/events_handler.py 
b/aria/orchestrator/workflows/core/events_handler.py
index b43082b..83b79d5 100644
--- a/aria/orchestrator/workflows/core/events_handler.py
+++ b/aria/orchestrator/workflows/core/events_handler.py
@@ -41,6 +41,9 @@ def _task_started(task, *args, **kwargs):
         task.started_at = datetime.utcnow()
         task.status = task.STARTED
 
+@events.start_task_signal.connect
+def _node_task_started(task, *args, **kwargs):
+    with task._update():
         _update_node_state_if_necessary(task, is_transitional=True)
 
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/55366e64/aria/orchestrator/workflows/core/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/task.py 
b/aria/orchestrator/workflows/core/task.py
index 548bf47..4a0b862 100644
--- a/aria/orchestrator/workflows/core/task.py
+++ b/aria/orchestrator/workflows/core/task.py
@@ -64,15 +64,17 @@ class BaseTask(object):
         return self._id
 
 
-class StubTask(BaseTask):
+class MarkerTaskBase(BaseTask):
     """
-    Base stub task for all tasks that don't actually run anything
+    Base stub task for marker user tasks that only mark the start/end of a 
workflow
+    or sub-workflow
     """
-
     def __init__(self, *args, **kwargs):
-        super(StubTask, self).__init__(executor=dry.StubExecutor(), *args, 
**kwargs)
+        super(MarkerTaskBase, self).__init__(executor=dry.MarkerExecutor(), 
*args, **kwargs)
         self.status = models.Task.PENDING
         self.due_at = datetime.utcnow()
+        self.started_at = None
+        self.ended_at = None
 
     def has_ended(self):
         return self.status in (models.Task.SUCCESS, models.Task.FAILED)
@@ -80,29 +82,37 @@ class StubTask(BaseTask):
     def is_waiting(self):
         return self.status in (models.Task.PENDING, models.Task.RETRYING)
 
+    def end(self):
+        self.ended_at = datetime.utcnow()
+        self.status = models.Task.SUCCESS
+
+    def start(self):
+        self.started_at = datetime.utcnow()
+        self.status = models.Task.STARTED
+
 
-class StartWorkflowTask(StubTask):
+class StartWorkflowTask(MarkerTaskBase):
     """
     Task marking a workflow start
     """
     pass
 
 
-class EndWorkflowTask(StubTask):
+class EndWorkflowTask(MarkerTaskBase):
     """
     Task marking a workflow end
     """
     pass
 
 
-class StartSubWorkflowTask(StubTask):
+class StartSubWorkflowTask(MarkerTaskBase):
     """
     Task marking a subworkflow start
     """
     pass
 
 
-class EndSubWorkflowTask(StubTask):
+class EndSubWorkflowTask(MarkerTaskBase):
     """
     Task marking a subworkflow end
     """
@@ -113,15 +123,18 @@ class OperationTask(BaseTask):
     """
     Operation task
     """
+    def __init__(self, api_task, executor=None, *args, **kwargs):
+        # If no executor is provided, we defer that this is a stub task which 
does not need to be
+        # executed.
+        super(OperationTask, self).__init__(
+            id=api_task.id, executor=executor or dry.StubExecutor(), *args, 
**kwargs)
 
-    def __init__(self, api_task, *args, **kwargs):
-        super(OperationTask, self).__init__(id=api_task.id, **kwargs)
         self._workflow_context = api_task._workflow_context
         self.interface_name = api_task.interface_name
         self.operation_name = api_task.operation_name
         model_storage = api_task._workflow_context.model
-        plugin = api_task.plugin
 
+        # This currently signal that this is a stub task
         base_task_model = model_storage.task.model_cls
         if isinstance(api_task.actor, models.Node):
             context_cls = operation_context.NodeOperationContext
@@ -141,15 +154,18 @@ class OperationTask(BaseTask):
 
         task_model = create_task_model(
             name=api_task.name,
-            implementation=api_task.implementation,
             actor=api_task.actor,
-            inputs=api_task.inputs,
             status=base_task_model.PENDING,
             max_attempts=api_task.max_attempts,
             retry_interval=api_task.retry_interval,
             ignore_failure=api_task.ignore_failure,
-            plugin=plugin,
-            execution=self._workflow_context.execution
+            execution=self._workflow_context.execution,
+
+            # Only non-stub tasks have these fields
+            plugin=getattr(api_task, 'plugin', None),
+            implementation=getattr(api_task, 'implementation', None),
+            inputs=getattr(api_task, 'inputs', {}),
+
         )
         self._workflow_context.model.task.put(task_model)
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/55366e64/aria/orchestrator/workflows/core/translation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/translation.py 
b/aria/orchestrator/workflows/core/translation.py
index 1ae59a3..487d44d 100644
--- a/aria/orchestrator/workflows/core/translation.py
+++ b/aria/orchestrator/workflows/core/translation.py
@@ -49,9 +49,12 @@ def build_execution_graph(
             default=[start_task])
 
         if isinstance(api_task, api.task.OperationTask):
-            # Add the task an the dependencies
-            operation_task = core_task.OperationTask(api_task, 
executor=executor)
+            if api_task.is_stub:
+                operation_task = core_task.OperationTask(api_task)
+            else:
+                operation_task = core_task.OperationTask(api_task, 
executor=executor)
             _add_task_and_dependencies(execution_graph, operation_task, 
operation_dependencies)
+
         elif isinstance(api_task, api.task.WorkflowTask):
             # Build the graph recursively while adding start and end markers
             build_execution_graph(
@@ -62,9 +65,6 @@ def build_execution_graph(
                 end_cls=core_task.EndSubWorkflowTask,
                 depends_on=operation_dependencies
             )
-        elif isinstance(api_task, api.task.StubTask):
-            stub_task = core_task.StubTask(id=api_task.id)
-            _add_task_and_dependencies(execution_graph, stub_task, 
operation_dependencies)
         else:
             raise RuntimeError('Undefined state')
 
@@ -88,8 +88,7 @@ def _get_tasks_from_dependencies(execution_graph, 
dependencies, default=()):
     Returns task list from dependencies.
     """
     return [execution_graph.node[dependency.id
-                                 if isinstance(dependency, 
(api.task.OperationTask,
-                                                            api.task.StubTask))
+                                 if isinstance(dependency, 
api.task.OperationTask)
                                  else _end_graph_suffix(dependency.id)]['task']
             for dependency in dependencies] or default
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/55366e64/aria/orchestrator/workflows/executor/dry.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/dry.py 
b/aria/orchestrator/workflows/executor/dry.py
index 5be8015..55d8f98 100644
--- a/aria/orchestrator/workflows/executor/dry.py
+++ b/aria/orchestrator/workflows/executor/dry.py
@@ -17,42 +17,32 @@
 Dry executor
 """
 
-from datetime import datetime
-
 from aria.orchestrator import events
 from .base import BaseExecutor
 
 
 # TODO: the name of this module should definitely change
 
-class StubExecutor(BaseExecutor):
-
-    @staticmethod
-    def _task_sent(*args, **kwargs):
-        pass
-
-    @staticmethod
-    def _task_started(task):
-        events.start_task_signal.send(task, skip_logging=True)
-
-    @staticmethod
-    def _task_succeeded(task):
-        events.on_success_task_signal.send(task, skip_logging=True)
+class MarkerExecutor(BaseExecutor):
+    def execute(self, task):
+        task.start()
+        task.end()
 
-    @staticmethod
-    def _task_failed(*args, **kwargs):
-        pass
 
+class StubExecutor(BaseExecutor):
     def execute(self, task):
-        pass
+        events.start_task_signal.send(task)
+        events.on_success_task_signal.send(task)
 
 
-class DryExecutor(StubExecutor):
+class DryExecutor(BaseExecutor):
     """
     Executor which dry runs tasks - prints task information without causing 
any side effects
     """
 
     def execute(self, task):
+        events.start_task_signal.send(task, skip_logging=True)
+
         if hasattr(task.actor, 'source_node'):
             name = '{source_node.name}->{target_node.name}'.format(
                 source_node=task.actor.source_node, 
target_node=task.actor.target_node)
@@ -66,3 +56,5 @@ class DryExecutor(StubExecutor):
         task.context.logger.info(
             '<dry> {name} {task.interface_name}.{task.operation_name} 
successful'
             .format(name=name, task=task))
+
+        events.on_success_task_signal.send(task, skip_logging=True)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/55366e64/tests/end2end/test_hello_world.py
----------------------------------------------------------------------
diff --git a/tests/end2end/test_hello_world.py 
b/tests/end2end/test_hello_world.py
index 8895593..dc8a2a3 100644
--- a/tests/end2end/test_hello_world.py
+++ b/tests/end2end/test_hello_world.py
@@ -30,8 +30,7 @@ def test_hello_world(testenv):
         # Even if some assertions failed, attempt to execute uninstall so the
         # webserver process doesn't stay up once the test is finished
         # TODO: remove force_service_delete=True
-        pass
-        # testenv.uninstall_service(force_service_delete=True)
+        testenv.uninstall_service(force_service_delete=True)
 
     _verify_webserver_down('http://localhost:9090')
     testenv.verify_clean_storage()
@@ -58,5 +57,5 @@ def _verify_deployed_service_in_storage(service_name, 
model_storage):
     assert service.name == service_name
     assert len(service.executions) == 1
     assert len(service.nodes) == 2
-    # TODO: validate node states
+    assert all(node.state == node.STARTED for node in service.nodes.values())
     assert len(service.executions[0].logs) > 0

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/55366e64/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
----------------------------------------------------------------------
diff --git 
a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py 
b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
index 514bce9..16cb47d 100644
--- a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
+++ b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
@@ -17,6 +17,7 @@ from networkx import topological_sort, DiGraph
 
 from aria.orchestrator import context
 from aria.orchestrator.workflows import api, core
+from aria.orchestrator.workflows.executor import dry
 
 from tests import mock
 from tests import storage
@@ -63,7 +64,8 @@ def test_task_graph_into_execution_graph(tmpdir):
     # Direct check
     execution_graph = DiGraph()
     core.translation.build_execution_graph(task_graph=test_task_graph,
-                                           execution_graph=execution_graph)
+                                           execution_graph=execution_graph,
+                                           executor=dry.MarkerExecutor())
     execution_tasks = topological_sort(execution_graph)
 
     assert len(execution_tasks) == 7

Reply via email to