Repository: incubator-ariatosca
Updated Branches:
  
refs/heads/ARIA-214-Dry-execution-changes-the-state-of-non-implemented-operations
 db3f9aeaf -> 532da3ad8 (forced update)


ARIA-214 Dry execution changes the state of non implemented operations


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/532da3ad
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/532da3ad
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/532da3ad

Branch: 
refs/heads/ARIA-214-Dry-execution-changes-the-state-of-non-implemented-operations
Commit: 532da3ad8b1bfb8a207a11be73be904f61c6ce91
Parents: 1cb3086
Author: max-orlov <ma...@gigaspaces.com>
Authored: Sun May 7 16:12:56 2017 +0300
Committer: max-orlov <ma...@gigaspaces.com>
Committed: Sun May 7 18:03:09 2017 +0300

----------------------------------------------------------------------
 aria/orchestrator/workflows/core/task.py        |  3 ---
 aria/orchestrator/workflows/core/translation.py |  6 +----
 aria/orchestrator/workflows/executor/base.py    | 15 ++++++------
 aria/orchestrator/workflows/executor/celery.py  |  2 ++
 aria/orchestrator/workflows/executor/dry.py     | 24 ++++++++++----------
 aria/orchestrator/workflows/executor/process.py |  3 +++
 aria/orchestrator/workflows/executor/thread.py  |  2 ++
 tests/orchestrator/workflows/core/test_task.py  |  7 ++----
 .../workflows/executor/test_process_executor.py | 17 +-------------
 9 files changed, 31 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/532da3ad/aria/orchestrator/workflows/core/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/task.py 
b/aria/orchestrator/workflows/core/task.py
index 0e081c2..755a4b9 100644
--- a/aria/orchestrator/workflows/core/task.py
+++ b/aria/orchestrator/workflows/core/task.py
@@ -163,9 +163,6 @@ class OperationTask(BaseTask):
         self._task_id = task_model.id
         self._update_fields = None
 
-    def execute(self):
-        super(OperationTask, self).execute()
-
     @contextmanager
     def _update(self):
         """

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/532da3ad/aria/orchestrator/workflows/core/translation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/translation.py 
b/aria/orchestrator/workflows/core/translation.py
index 0bbce90..fec108b 100644
--- a/aria/orchestrator/workflows/core/translation.py
+++ b/aria/orchestrator/workflows/core/translation.py
@@ -48,11 +48,7 @@ def build_execution_graph(
             execution_graph, dependencies, default=[start_task])
 
         if isinstance(api_task, api.task.OperationTask):
-            if api_task.implementation:
-                operation_task = core_task.OperationTask(api_task, 
executor=default_executor)
-            else:
-                operation_task = core_task.OperationTask(api_task,
-                                                         
executor=base.EmptyOperationExecutor())
+            operation_task = core_task.OperationTask(api_task, 
executor=default_executor)
             _add_task_and_dependencies(execution_graph, operation_task, 
operation_dependencies)
         elif isinstance(api_task, api.task.WorkflowTask):
             # Build the graph recursively while adding start and end markers

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/532da3ad/aria/orchestrator/workflows/executor/base.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/base.py 
b/aria/orchestrator/workflows/executor/base.py
index a225837..a95e58f 100644
--- a/aria/orchestrator/workflows/executor/base.py
+++ b/aria/orchestrator/workflows/executor/base.py
@@ -31,7 +31,14 @@ class BaseExecutor(logger.LoggerMixin):
         Execute a task
         :param task: task to execute
         """
-        raise NotImplementedError
+        if task.model_task.implementation:
+            self._execute_empty_task(task)
+        else:
+            raise NotImplementedError
+
+    def _execute_empty_task(self, task):
+        self._task_started(task)
+        self._task_succeeded(task)
 
     def close(self):
         """
@@ -55,9 +62,3 @@ class BaseExecutor(logger.LoggerMixin):
 class StubTaskExecutor(BaseExecutor):
     def execute(self, task):
         task.status = task.SUCCESS
-
-
-class EmptyOperationExecutor(BaseExecutor):
-    def execute(self, task):
-        events.start_task_signal.send(task)
-        events.on_success_task_signal.send(task)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/532da3ad/aria/orchestrator/workflows/executor/celery.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/celery.py 
b/aria/orchestrator/workflows/executor/celery.py
index 7bd9b7c..0406d4e 100644
--- a/aria/orchestrator/workflows/executor/celery.py
+++ b/aria/orchestrator/workflows/executor/celery.py
@@ -43,6 +43,8 @@ class CeleryExecutor(BaseExecutor):
         self._started_queue.get(timeout=30)
 
     def execute(self, task):
+        if not task.implementation:
+            return self._execute_empty_task(task)
         self._tasks[task.id] = task
         inputs = dict(inp.unwrap() for inp in task.inputs.values())
         inputs['ctx'] = task.context

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/532da3ad/aria/orchestrator/workflows/executor/dry.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/dry.py 
b/aria/orchestrator/workflows/executor/dry.py
index eb70a41..5a4ca73 100644
--- a/aria/orchestrator/workflows/executor/dry.py
+++ b/aria/orchestrator/workflows/executor/dry.py
@@ -25,7 +25,6 @@ class DryExecutor(BaseExecutor):
     """
     Executor which dry runs tasks - prints task information without causing 
any side effects
     """
-
     def execute(self, task):
         # updating the task manually instead of calling 
self._task_started(task),
         # to avoid any side effects raising that event might cause
@@ -33,19 +32,20 @@ class DryExecutor(BaseExecutor):
             task.started_at = datetime.utcnow()
             task.status = task.STARTED
 
-        if hasattr(task.actor, 'source_node'):
-            name = '{source_node.name}->{target_node.name}'.format(
-                source_node=task.actor.source_node, 
target_node=task.actor.target_node)
-        else:
-            name = task.actor.name
+        if task.implementation:
+            if hasattr(task.actor, 'source_node'):
+                name = '{source_node.name}->{target_node.name}'.format(
+                    source_node=task.actor.source_node, 
target_node=task.actor.target_node)
+            else:
+                name = task.actor.name
 
-        task.context.logger.info(
-            '<dry> {name} {task.interface_name}.{task.operation_name} 
started...'
-            .format(name=name, task=task))
+            task.context.logger.info(
+                '<dry> {name} {task.interface_name}.{task.operation_name} 
started...'
+                .format(name=name, task=task))
 
-        task.context.logger.info(
-            '<dry> {name} {task.interface_name}.{task.operation_name} 
successful'
-            .format(name=name, task=task))
+            task.context.logger.info(
+                '<dry> {name} {task.interface_name}.{task.operation_name} 
successful'
+                .format(name=name, task=task))
 
         # updating the task manually instead of calling 
self._task_succeeded(task),
         # to avoid any side effects raising that event might cause

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/532da3ad/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py 
b/aria/orchestrator/workflows/executor/process.py
index 8481406..3053c7e 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -119,6 +119,9 @@ class ProcessExecutor(base.BaseExecutor):
 
     def execute(self, task):
         self._check_closed()
+
+        if not task.implementation:
+            return self._execute_empty_task(task)
         self._tasks[task.id] = task
 
         # Temporary file used to pass arguments to the started subprocess

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/532da3ad/aria/orchestrator/workflows/executor/thread.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/thread.py 
b/aria/orchestrator/workflows/executor/thread.py
index 836b2bf..8e16870 100644
--- a/aria/orchestrator/workflows/executor/thread.py
+++ b/aria/orchestrator/workflows/executor/thread.py
@@ -47,6 +47,8 @@ class ThreadExecutor(BaseExecutor):
             self._pool.append(thread)
 
     def execute(self, task):
+        if not task.implementation:
+            return self._execute_empty_task(task)
         self._queue.put(task)
 
     def close(self):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/532da3ad/tests/orchestrator/workflows/core/test_task.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_task.py 
b/tests/orchestrator/workflows/core/test_task.py
index 748ee20..50ca7f5 100644
--- a/tests/orchestrator/workflows/core/test_task.py
+++ b/tests/orchestrator/workflows/core/test_task.py
@@ -24,7 +24,6 @@ from aria.orchestrator.workflows import (
     api,
     core,
     exceptions,
-    executor
 )
 
 from tests import mock, storage
@@ -71,8 +70,7 @@ class TestOperationTask(object):
                 node,
                 interface_name=NODE_INTERFACE_NAME,
                 operation_name=NODE_OPERATION_NAME)
-            core_task = core.task.OperationTask(api_task=api_task,
-                                                
executor=executor.base.EmptyOperationExecutor())
+            core_task = core.task.OperationTask(api_task=api_task, 
executor=None)
         return api_task, core_task
 
     def _create_relationship_operation_task(self, ctx, relationship):
@@ -81,8 +79,7 @@ class TestOperationTask(object):
                 relationship,
                 interface_name=RELATIONSHIP_INTERFACE_NAME,
                 operation_name=RELATIONSHIP_OPERATION_NAME)
-            core_task = core.task.OperationTask(api_task=api_task,
-                                                
executor=executor.base.EmptyOperationExecutor())
+            core_task = core.task.OperationTask(api_task=api_task, 
executor=None)
         return api_task, core_task
 
     def test_node_operation_task_creation(self, ctx):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/532da3ad/tests/orchestrator/workflows/executor/test_process_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py 
b/tests/orchestrator/workflows/executor/test_process_executor.py
index b353518..6be2135 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor.py
@@ -66,7 +66,7 @@ class TestProcessExecutor(object):
     def test_closed(self, executor):
         executor.close()
         with pytest.raises(RuntimeError) as exc_info:
-            executor.execute(task=None)
+            executor.execute(task=MockTask(implementation=None))
         assert 'closed' in exc_info.value.message
 
 
@@ -82,18 +82,3 @@ def mock_plugin(plugin_manager, tmpdir):
     source = os.path.join(tests.resources.DIR, 'plugins', 'mock-plugin1')
     plugin_path = create_plugin(source=source, destination_dir=str(tmpdir))
     return plugin_manager.install(source=plugin_path)
-
-
-class MockContext(object):
-
-    def __init__(self, *args, **kwargs):
-        self.logger = logging.getLogger('mock_logger')
-        self.task = type('SubprocessMockTask', (object, ), {'plugin': None})
-        self.serialization_dict = {'context_cls': self.__class__, 'context': 
{}}
-
-    def __getattr__(self, item):
-        return None
-
-    @classmethod
-    def deserialize_from_dict(cls, **kwargs):
-        return cls()

Reply via email to