Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-408-Remove-execution-creation-from-WorkflowRunner d0a45edad 
-> 705e81bcf


review 1


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/705e81bc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/705e81bc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/705e81bc

Branch: refs/heads/ARIA-408-Remove-execution-creation-from-WorkflowRunner
Commit: 705e81bcff20e3631e93ed9c3276406f3139bd46
Parents: d0a45ed
Author: max-orlov <[email protected]>
Authored: Tue Nov 21 17:17:57 2017 +0200
Committer: max-orlov <[email protected]>
Committed: Tue Nov 21 17:17:57 2017 +0200

----------------------------------------------------------------------
 aria/cli/commands/executions.py                 |  10 +-
 aria/orchestrator/execution_compiler.py         | 161 ------------------
 aria/orchestrator/execution_preparer.py         | 170 +++++++++++++++++++
 docs/aria.orchestrator.rst                      |   4 +-
 .../execution/test_execution_compiler.py        |  60 +++----
 .../orchestrator/execution_plugin/test_local.py |   3 +-
 6 files changed, 208 insertions(+), 200 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/705e81bc/aria/cli/commands/executions.py
----------------------------------------------------------------------
diff --git a/aria/cli/commands/executions.py b/aria/cli/commands/executions.py
index de030c6..2415e19 100644
--- a/aria/cli/commands/executions.py
+++ b/aria/cli/commands/executions.py
@@ -25,7 +25,7 @@ from .. import utils
 from .. import logger as cli_logger
 from .. import execution_logging
 from ..core import aria
-from ...orchestrator import execution_compiler
+from ...orchestrator import execution_preparer
 from ...modeling.models import Execution
 from ...orchestrator.workflows.core.engine import Engine
 from ...orchestrator.workflows.executor.dry import DryExecutor
@@ -145,14 +145,14 @@ def start(workflow_name,
     service = model_storage.service.get_by_name(service_name)
     executor = DryExecutor() if dry else 
ProcessExecutor(plugin_manager=plugin_manager)
 
-    compiler = execution_compiler.ExecutionCompiler(
+    compiler = execution_preparer.ExecutionPreparer(
         model_storage,
         resource_storage,
         plugin_manager,
         service,
         workflow_name
     )
-    workflow_ctx = compiler.compile(inputs, executor=executor)
+    workflow_ctx = compiler.prepare(inputs, executor=executor)
 
     engine = Engine(executor)
     logger.info('Starting {0}execution. Press Ctrl+C cancel'.format('dry ' if 
dry else ''))
@@ -194,13 +194,13 @@ def resume(execution_id,
                     .format(execution=execution_to_resume))
         return
 
-    workflow_ctx = execution_compiler.ExecutionCompiler(
+    workflow_ctx = execution_preparer.ExecutionPreparer(
         model_storage,
         resource_storage,
         plugin_manager,
         execution_to_resume.service,
         execution_to_resume.workflow_name
-    ).compile(execution_id=execution_to_resume.id)
+    ).prepare(execution_id=execution_to_resume.id)
 
     engine = Engine(executor)
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/705e81bc/aria/orchestrator/execution_compiler.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_compiler.py 
b/aria/orchestrator/execution_compiler.py
deleted file mode 100644
index f86e6b3..0000000
--- a/aria/orchestrator/execution_compiler.py
+++ /dev/null
@@ -1,161 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import os
-import sys
-from datetime import datetime
-
-from . import exceptions
-from .context.workflow import WorkflowContext
-from .workflows import builtin
-from .workflows.core import graph_compiler
-from .workflows.executor.process import ProcessExecutor
-from ..modeling import models
-from ..modeling import utils as modeling_utils
-from ..utils.imports import import_fullname
-
-
-DEFAULT_TASK_MAX_ATTEMPTS = 30
-DEFAULT_TASK_RETRY_INTERVAL = 30
-
-
-class ExecutionCompiler(object):
-    def __init__(
-            self,
-            model,
-            resource,
-            plugin,
-            service,
-            workflow_name,
-            task_max_attempts=None,
-            task_retry_interval=None
-    ):
-        self._model = model
-        self._resource = resource
-        self._plugin = plugin
-        self._service = service
-        self._workflow_name = workflow_name
-        self._workflow_context = None
-        self._execution = None
-        self._task_max_attempts = task_max_attempts or 
DEFAULT_TASK_MAX_ATTEMPTS
-        self._task_retry_interval = task_retry_interval or 
DEFAULT_TASK_RETRY_INTERVAL
-
-    @property
-    def workflow_ctx(self):
-        if self._workflow_context is None:
-            self._workflow_context = WorkflowContext(
-                name=self.__class__.__name__,
-                model_storage=self._model,
-                resource_storage=self._resource,
-                service_id=self._execution.service.id,
-                execution_id=self._execution.id,
-                workflow_name=self._execution.workflow_name,
-                task_max_attempts=self._task_max_attempts,
-                task_retry_interval=self._task_retry_interval,
-            )
-        return self._workflow_context
-
-    def compile(self, execution_inputs=None, executor=None, execution_id=None):
-        assert not (execution_inputs and executor and execution_id)
-
-        if execution_id is None:
-            # If the execution is new
-            self._execution = self._create_execution_model(execution_inputs)
-            self._model.execution.put(self._execution)
-            self._create_tasks(executor)
-            self._model.execution.update(self._execution)
-        else:
-            # If resuming an execution
-            self._execution = self._model.execution.get(execution_id)
-
-        return self.workflow_ctx
-
-    def _create_tasks(self, executor=None):
-
-        # Set default executor and kwargs
-        executor = executor or ProcessExecutor(plugin_manager=self._plugin)
-
-        # transforming the execution inputs to dict, to pass them to the 
workflow function
-        execution_inputs_dict = dict(inp.unwrapped for inp in 
self._execution.inputs.itervalues())
-
-        if len(self._execution.tasks) == 0:
-            workflow_fn = self._get_workflow_fn(self._execution.workflow_name)
-            tasks_graph = workflow_fn(ctx=self.workflow_ctx, 
**execution_inputs_dict)
-            compiler = graph_compiler.GraphCompiler(self.workflow_ctx, 
executor.__class__)
-            compiler.compile(tasks_graph)
-
-    def _create_execution_model(self, inputs=None):
-        self._validate_workflow_exists_for_service()
-        self._validate_no_active_executions()
-
-        execution = models.Execution(
-            created_at=datetime.utcnow(),
-            service_fk=self._service.id,
-            workflow_name=self._workflow_name,
-            inputs={})
-
-        if self._workflow_name in builtin.BUILTIN_WORKFLOWS:
-            workflow_inputs = dict()  # built-in workflows don't have any 
inputs
-        else:
-            workflow_inputs = 
self._service.workflows[self._workflow_name].inputs
-
-        
modeling_utils.validate_no_undeclared_inputs(declared_inputs=workflow_inputs,
-                                                     supplied_inputs=inputs or 
{})
-        
modeling_utils.validate_required_inputs_are_supplied(declared_inputs=workflow_inputs,
-                                                             
supplied_inputs=inputs or {})
-        execution.inputs = modeling_utils.merge_parameter_values(
-            inputs, workflow_inputs, model_cls=models.Input)
-
-        return execution
-
-    def _validate_no_active_executions(self):
-        active_executions = [e for e in self._service.executions if
-                             e.is_active()]
-        if active_executions:
-            raise exceptions.ActiveExecutionsError(
-                "Can't start execution; Service {0} has an active execution 
with ID {1}"
-                .format(self._service.name, active_executions[0].id))
-
-    def _validate_workflow_exists_for_service(self):
-        if self._workflow_name not in self._service.workflows and \
-                        self._workflow_name not in builtin.BUILTIN_WORKFLOWS:
-            raise exceptions.UndeclaredWorkflowError(
-                'No workflow policy {0} declared in service {1}'
-                .format(self._workflow_name, self._service.name))
-
-    def _get_workflow_fn(self, workflow_name):
-        if workflow_name in builtin.BUILTIN_WORKFLOWS:
-            return 
import_fullname('{0}.{1}'.format(builtin.BUILTIN_WORKFLOWS_PATH_PREFIX,
-                                                    workflow_name))
-
-        workflow = self._service.workflows[workflow_name]
-
-        # TODO: Custom workflow support needs improvement, currently this code 
uses internal
-        # knowledge of the resource storage; Instead, workflows should 
probably be loaded
-        # in a similar manner to operation plugins. Also consider passing to 
import_fullname
-        # as paths instead of appending to sys path.
-        service_template_resources_path = os.path.join(
-            self._resource.service_template.base_path,
-            str(self._service.service_template.id))
-        sys.path.append(service_template_resources_path)
-
-        try:
-            workflow_fn = import_fullname(workflow.function)
-        except ImportError:
-            raise exceptions.WorkflowImplementationNotFoundError(
-                'Could not find workflow {0} function at {1}'.format(
-                    workflow_name, workflow.function))
-
-        return workflow_fn

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/705e81bc/aria/orchestrator/execution_preparer.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_preparer.py 
b/aria/orchestrator/execution_preparer.py
new file mode 100644
index 0000000..c59ae44
--- /dev/null
+++ b/aria/orchestrator/execution_preparer.py
@@ -0,0 +1,170 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import sys
+from datetime import datetime
+
+from . import exceptions
+from .context.workflow import WorkflowContext
+from .workflows import builtin
+from .workflows.core import graph_compiler
+from .workflows.executor.process import ProcessExecutor
+from ..modeling import models
+from ..modeling import utils as modeling_utils
+from ..utils.imports import import_fullname
+
+
+DEFAULT_TASK_MAX_ATTEMPTS = 30
+DEFAULT_TASK_RETRY_INTERVAL = 30
+
+
+class ExecutionPreparer(object):
+    """
+    This class manages any execution and tasks related preparation for an 
execution of a workflow.
+    """
+    def __init__(
+            self,
+            model_storage,
+            resource_storagee,
+            plugin_manager,
+            service,
+            workflow_name,
+            task_max_attempts=None,
+            task_retry_interval=None
+    ):
+        self._model = model_storage
+        self._resource = resource_storagee
+        self._plugin = plugin_manager
+        self._service = service
+        self._workflow_name = workflow_name
+        self._task_max_attempts = task_max_attempts or 
DEFAULT_TASK_MAX_ATTEMPTS
+        self._task_retry_interval = task_retry_interval or 
DEFAULT_TASK_RETRY_INTERVAL
+
+    def get_ctx(self, execution):
+        return WorkflowContext(
+            name=self._workflow_name,
+            model_storage=self._model,
+            resource_storage=self._resource,
+            service_id=execution.service.id,
+            execution_id=execution.id,
+            workflow_name=execution.workflow_name,
+            task_max_attempts=self._task_max_attempts,
+            task_retry_interval=self._task_retry_interval,
+        )
+
+    def prepare(self, execution_inputs=None, executor=None, execution_id=None):
+        """
+        Prepares the execution and return the workflow ctx. If the execution 
is new, an execution
+        and tasks models would be created. A workflow context for the 
appropriate execution would
+        be created.
+
+        :param execution_inputs: Inputs for the execution.
+        :param executor: the execution for the tasks
+        :param execution_id: used for an existing execution (mainly for 
resuming).
+        :return:
+        """
+        assert not (execution_inputs and executor and execution_id)
+
+        if execution_id is None:
+            # If the execution is new
+            execution = self._create_execution_model(execution_inputs)
+            self._model.execution.put(execution)
+            ctx = self.get_ctx(execution)
+            self._create_tasks(ctx, executor)
+            self._model.execution.update(execution)
+        else:
+            # If resuming an execution
+            execution = self._model.execution.get(execution_id)
+            ctx = self.get_ctx(execution)
+
+        return ctx
+
+    def _create_tasks(self, ctx, executor=None):
+
+        # Set default executor and kwargs
+        executor = executor or ProcessExecutor(plugin_manager=self._plugin)
+
+        # transforming the execution inputs to dict, to pass them to the 
workflow function
+        execution_inputs_dict = dict(inp.unwrapped for inp in 
ctx.execution.inputs.itervalues())
+
+        workflow_fn = self._get_workflow_fn(ctx.execution.workflow_name)
+        api_tasks_graph = workflow_fn(ctx=ctx, **execution_inputs_dict)
+        compiler = graph_compiler.GraphCompiler(ctx, executor.__class__)
+        compiler.compile(api_tasks_graph)
+
+    def _create_execution_model(self, inputs=None):
+        self._validate_workflow_exists_for_service()
+        self._validate_no_active_executions()
+
+        execution = models.Execution(
+            created_at=datetime.utcnow(),
+            service_fk=self._service.id,
+            workflow_name=self._workflow_name,
+            inputs={})
+
+        if self._workflow_name in builtin.BUILTIN_WORKFLOWS:
+            workflow_inputs = dict()  # built-in workflows don't have any 
inputs
+        else:
+            workflow_inputs = 
self._service.workflows[self._workflow_name].inputs
+
+        
modeling_utils.validate_no_undeclared_inputs(declared_inputs=workflow_inputs,
+                                                     supplied_inputs=inputs or 
{})
+        
modeling_utils.validate_required_inputs_are_supplied(declared_inputs=workflow_inputs,
+                                                             
supplied_inputs=inputs or {})
+        execution.inputs = modeling_utils.merge_parameter_values(
+            inputs, workflow_inputs, model_cls=models.Input)
+
+        return execution
+
+    def _validate_no_active_executions(self):
+        active_executions = [e for e in self._service.executions if
+                             e.is_active()]
+        if active_executions:
+            raise exceptions.ActiveExecutionsError(
+                "Can't start execution; Service {0} has an active execution 
with ID {1}"
+                .format(self._service.name, active_executions[0].id))
+
+    def _validate_workflow_exists_for_service(self):
+        if self._workflow_name not in self._service.workflows and \
+                        self._workflow_name not in builtin.BUILTIN_WORKFLOWS:
+            raise exceptions.UndeclaredWorkflowError(
+                'No workflow policy {0} declared in service {1}'
+                .format(self._workflow_name, self._service.name))
+
+    def _get_workflow_fn(self, workflow_name):
+        if workflow_name in builtin.BUILTIN_WORKFLOWS:
+            return 
import_fullname('{0}.{1}'.format(builtin.BUILTIN_WORKFLOWS_PATH_PREFIX,
+                                                    workflow_name))
+
+        workflow = self._service.workflows[workflow_name]
+
+        # TODO: Custom workflow support needs improvement, currently this code 
uses internal
+        # knowledge of the resource storage; Instead, workflows should 
probably be loaded
+        # in a similar manner to operation plugins. Also consider passing to 
import_fullname
+        # as paths instead of appending to sys path.
+        service_template_resources_path = os.path.join(
+            self._resource.service_template.base_path,
+            str(self._service.service_template.id))
+        sys.path.append(service_template_resources_path)
+
+        try:
+            workflow_fn = import_fullname(workflow.function)
+        except ImportError:
+            raise exceptions.WorkflowImplementationNotFoundError(
+                'Could not find workflow {0} function at {1}'.format(
+                    workflow_name, workflow.function))
+
+        return workflow_fn

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/705e81bc/docs/aria.orchestrator.rst
----------------------------------------------------------------------
diff --git a/docs/aria.orchestrator.rst b/docs/aria.orchestrator.rst
index 5d7eda6..6e6d659 100644
--- a/docs/aria.orchestrator.rst
+++ b/docs/aria.orchestrator.rst
@@ -40,7 +40,7 @@
 
 .. automodule:: aria.orchestrator.plugin
 
-:mod:`aria.orchestrator.execution_compiler`
+:mod:`aria.orchestrator.execution_preparer`
 -------------------------------------------
 
-.. automodule:: aria.orchestrator.execution_compiler
+.. automodule:: aria.orchestrator.execution_preparer

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/705e81bc/tests/orchestrator/execution/test_execution_compiler.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution/test_execution_compiler.py 
b/tests/orchestrator/execution/test_execution_compiler.py
index 6062686..42e5272 100644
--- a/tests/orchestrator/execution/test_execution_compiler.py
+++ b/tests/orchestrator/execution/test_execution_compiler.py
@@ -24,7 +24,7 @@ from aria.modeling import exceptions as modeling_exceptions
 from aria.modeling import models
 from aria.orchestrator import exceptions
 from aria.orchestrator import events
-from aria.orchestrator import execution_compiler
+from aria.orchestrator import execution_preparer
 from aria.orchestrator.workflows import api
 from aria.orchestrator.workflows.core import engine, graph_compiler
 from aria.orchestrator.workflows.executor import thread
@@ -64,7 +64,7 @@ class FailingTask(BaseException):
 def test_undeclared_workflow(request):
     # validating a proper error is raised when the workflow is not declared in 
the service
     with pytest.raises(exceptions.UndeclaredWorkflowError):
-        _get_compiler(request, 'undeclared_workflow').compile()
+        _get_preparer(request, 'undeclared_workflow').prepare()
 
 
 def test_missing_workflow_implementation(service, request):
@@ -76,13 +76,13 @@ def test_missing_workflow_implementation(service, request):
     service.workflows['test_workflow'] = workflow
 
     with pytest.raises(exceptions.WorkflowImplementationNotFoundError):
-        _get_compiler(request, 'test_workflow').compile()
+        _get_preparer(request, 'test_workflow').prepare()
 
 
-def test_builtin_workflow_instantiation(request, model):
+def test_builtin_workflow_instantiation(request):
     # validates the workflow runner instantiates properly when provided with a 
builtin workflow
     # (expecting no errors to be raised on undeclared workflow or missing 
workflow implementation)
-    workflow_ctx = _get_compiler(request, 'install').compile()
+    workflow_ctx = _get_preparer(request, 'install').prepare()
     assert len(workflow_ctx.execution.tasks) == 18  # expecting 18 tasks for 2 
node topology
 
 
@@ -90,7 +90,7 @@ def test_custom_workflow_instantiation(request):
     # validates the workflow runner instantiates properly when provided with a 
custom workflow
     # (expecting no errors to be raised on undeclared workflow or missing 
workflow implementation)
     mock_workflow = _setup_mock_workflow_in_service(request)
-    workflow_ctx = _get_compiler(request, mock_workflow).compile()
+    workflow_ctx = _get_preparer(request, mock_workflow).prepare()
     assert len(workflow_ctx.execution.tasks) == 2   # mock workflow creates 
only start workflow
                                                     # and end workflow task
 
@@ -102,7 +102,7 @@ def test_existing_active_executions(request, service, 
model):
         workflow_name='uninstall')
     model.execution.put(existing_active_execution)
     with pytest.raises(exceptions.ActiveExecutionsError):
-        _get_compiler(request, 'install').compile()
+        _get_preparer(request, 'install').prepare()
 
 
 def test_existing_executions_but_no_active_ones(request, service, model):
@@ -112,13 +112,13 @@ def test_existing_executions_but_no_active_ones(request, 
service, model):
         workflow_name='uninstall')
     model.execution.put(existing_terminated_execution)
     # no active executions exist, so no error should be raised
-    _get_compiler(request, 'install').compile()
+    _get_preparer(request, 'install').prepare()
 
 
 def test_execution_model_creation(request, service):
     mock_workflow = _setup_mock_workflow_in_service(request)
 
-    workflow_ctx = _get_compiler(request, mock_workflow).compile()
+    workflow_ctx = _get_preparer(request, mock_workflow).prepare()
 
     assert workflow_ctx.execution.service.id == service.id
     assert workflow_ctx.execution.workflow_name == mock_workflow
@@ -133,7 +133,7 @@ def test_execution_inputs_override_workflow_inputs(request):
         inputs=dict((name, models.Input.wrap(name, val)) for name, val
                     in wf_inputs.iteritems()))
 
-    workflow_ctx = _get_compiler(request, mock_workflow).compile(
+    workflow_ctx = _get_preparer(request, mock_workflow).prepare(
         execution_inputs={'input2': 'overriding-value2', 'input3': 7}
     )
 
@@ -150,7 +150,7 @@ def test_execution_inputs_undeclared_inputs(request):
     mock_workflow = _setup_mock_workflow_in_service(request)
 
     with pytest.raises(modeling_exceptions.UndeclaredInputsException):
-        _get_compiler(request, mock_workflow).compile(
+        _get_preparer(request, mock_workflow).prepare(
             execution_inputs={'undeclared_input': 'value'})
 
 
@@ -159,7 +159,7 @@ def test_execution_inputs_missing_required_inputs(request):
         request, inputs={'required_input': models.Input.wrap('required_input', 
value=None)})
 
     with pytest.raises(modeling_exceptions.MissingRequiredInputsException):
-        _get_compiler(request, mock_workflow).compile(execution_inputs={})
+        _get_preparer(request, mock_workflow).prepare(execution_inputs={})
 
 
 def test_execution_inputs_wrong_type_inputs(request):
@@ -167,13 +167,13 @@ def test_execution_inputs_wrong_type_inputs(request):
         request, inputs={'input': models.Input.wrap('input', 'value')})
 
     with pytest.raises(modeling_exceptions.ParametersOfWrongTypeException):
-        _get_compiler(request, 
mock_workflow).compile(execution_inputs={'input': 5})
+        _get_preparer(request, 
mock_workflow).prepare(execution_inputs={'input': 5})
 
 
 def test_execution_inputs_builtin_workflow_with_inputs(request):
     # built-in workflows don't have inputs
     with pytest.raises(modeling_exceptions.UndeclaredInputsException):
-        _get_compiler(request, 
'install').compile(execution_inputs={'undeclared_input': 'value'})
+        _get_preparer(request, 
'install').prepare(execution_inputs={'undeclared_input': 'value'})
 
 
 def test_workflow_function_parameters(request, tmpdir):
@@ -188,7 +188,7 @@ def test_workflow_function_parameters(request, tmpdir):
         request, inputs=dict((name, models.Input.wrap(name, val)) for name, val
                              in wf_inputs.iteritems()))
 
-    _get_compiler(request, mock_workflow).compile(
+    _get_preparer(request, mock_workflow).prepare(
         execution_inputs={'input2': 'overriding-value2', 'input3': 7})
 
     with open(output_path) as f:
@@ -230,14 +230,14 @@ def _setup_mock_workflow_in_service(request, inputs=None):
     return mock_workflow_name
 
 
-def _get_compiler(request, workflow_name):
+def _get_preparer(request, workflow_name):
     # helper method for instantiating a workflow runner
     service = request.getfixturevalue('service')
     model = request.getfixturevalue('model')
     resource = request.getfixturevalue('resource')
     plugin_manager = request.getfixturevalue('plugin_manager')
 
-    return execution_compiler.ExecutionCompiler(
+    return execution_preparer.ExecutionPreparer(
         model,
         resource,
         plugin_manager,
@@ -248,7 +248,7 @@ def _get_compiler(request, workflow_name):
 
 class TestResumableWorkflows(object):
 
-    def _compile_execution(
+    def _prepare_execution_and_get_workflow_ctx(
             self,
             model,
             resource,
@@ -265,16 +265,16 @@ class TestResumableWorkflows(object):
             }
         )
         model.service.update(service)
-        compiler = execution_compiler.ExecutionCompiler(
+        compiler = execution_preparer.ExecutionPreparer(
             model, resource, None, service, 'custom_workflow'
         )
-        ctx = compiler.compile(inputs, executor)
+        ctx = compiler.prepare(inputs, executor)
         model.execution.update(ctx.execution)
 
         return ctx
 
     @staticmethod
-    def _wait_for_active_and_cancel(eng, ctx):
+    def _cancel_active_execution(eng, ctx):
         if custom_events['is_active'].wait(60) is False:
             raise TimeoutError("is_active wasn't set to True")
         eng.cancel_execution(ctx)
@@ -285,7 +285,7 @@ class TestResumableWorkflows(object):
         node = 
workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
         node.attributes['invocations'] = models.Attribute.wrap('invocations', 
0)
         self._create_interface(workflow_context, node, 
mock_pass_first_task_only)
-        ctx = self._compile_execution(
+        ctx = self._prepare_execution_and_get_workflow_ctx(
             workflow_context.model,
             workflow_context.resource,
             workflow_context.model.service.list()[0],
@@ -301,7 +301,7 @@ class TestResumableWorkflows(object):
         wf_thread.start()
 
         # Wait for the execution to start
-        self._wait_for_active_and_cancel(eng, ctx)
+        self._cancel_active_execution(eng, ctx)
         node = ctx.model.node.refresh(node)
 
         tasks = ctx.model.task.list(filters={'_stub_type': None})
@@ -310,7 +310,7 @@ class TestResumableWorkflows(object):
         custom_events['is_resumed'].set()
         assert any(task.status == task.RETRYING for task in tasks)
 
-        # Create a new workflow runner, with an existing execution id. This 
would cause
+        # Create a new workflow engine, with an existing execution id. This 
would cause
         # the old execution to restart.
         new_engine = engine.Engine(thread_executor)
         new_engine.execute(ctx, resuming=True)
@@ -326,7 +326,7 @@ class TestResumableWorkflows(object):
         node.attributes['invocations'] = models.Attribute.wrap('invocations', 
0)
         self._create_interface(workflow_context, node, mock_stuck_task)
 
-        ctx = self._compile_execution(
+        ctx = self._prepare_execution_and_get_workflow_ctx(
             workflow_context.model,
             workflow_context.resource,
             workflow_context.model.service.list()[0],
@@ -340,7 +340,7 @@ class TestResumableWorkflows(object):
         wf_thread.daemon = True
         wf_thread.start()
 
-        self._wait_for_active_and_cancel(eng, ctx)
+        self._cancel_active_execution(eng, ctx)
         node = workflow_context.model.node.refresh(node)
         task = workflow_context.model.task.list(filters={'_stub_type': 
None})[0]
         assert node.attributes['invocations'].value == 1
@@ -366,7 +366,7 @@ class TestResumableWorkflows(object):
         node.attributes['invocations'] = models.Attribute.wrap('invocations', 
0)
         self._create_interface(workflow_context, node, 
mock_failed_before_resuming)
 
-        ctx = self._compile_execution(
+        ctx = self._prepare_execution_and_get_workflow_ctx(
             workflow_context.model,
             workflow_context.resource,
             workflow_context.model.service.list()[0],
@@ -378,7 +378,7 @@ class TestResumableWorkflows(object):
         wf_thread.setDaemon(True)
         wf_thread.start()
 
-        self._wait_for_active_and_cancel(eng, ctx)
+        self._cancel_active_execution(eng, ctx)
         node = workflow_context.model.node.refresh(node)
 
         task = workflow_context.model.task.list(filters={'_stub_type': 
None})[0]
@@ -409,7 +409,7 @@ class TestResumableWorkflows(object):
         node.attributes['invocations'] = models.Attribute.wrap('invocations', 
0)
         self._create_interface(workflow_context, node, 
mock_pass_first_task_only)
 
-        ctx = self._compile_execution(
+        ctx = self._prepare_execution_and_get_workflow_ctx(
             workflow_context.model,
             workflow_context.resource,
             workflow_context.model.service.list()[0],
@@ -457,7 +457,7 @@ class TestResumableWorkflows(object):
         node.attributes['invocations'] = models.Attribute.wrap('invocations', 
0)
         self._create_interface(workflow_context, node, 
mock_fail_first_task_only)
 
-        ctx = self._compile_execution(
+        ctx = self._prepare_execution_and_get_workflow_ctx(
             workflow_context.model,
             workflow_context.resource,
             workflow_context.model.service.list()[0],

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/705e81bc/tests/orchestrator/execution_plugin/test_local.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_local.py 
b/tests/orchestrator/execution_plugin/test_local.py
index fad05de..5af6a2f 100644
--- a/tests/orchestrator/execution_plugin/test_local.py
+++ b/tests/orchestrator/execution_plugin/test_local.py
@@ -500,8 +500,7 @@ if __name__ == '__main__':
                 arguments=arguments))
             return graph
         tasks_graph = mock_workflow(ctx=workflow_context)  # pylint: 
disable=no-value-for-parameter
-        graph_compiler.GraphCompiler(workflow_context, 
executor.__class__).compile(
-            tasks_graph)
+        graph_compiler.GraphCompiler(workflow_context, 
executor.__class__).compile(tasks_graph)
         eng = engine.Engine(executor)
         eng.execute(workflow_context)
         return workflow_context.model.node.get_by_name(

Reply via email to