Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-408-Remove-execution-creation-from-WorkflowRunner d0a45edad -> 705e81bcf
review 1 Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/705e81bc Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/705e81bc Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/705e81bc Branch: refs/heads/ARIA-408-Remove-execution-creation-from-WorkflowRunner Commit: 705e81bcff20e3631e93ed9c3276406f3139bd46 Parents: d0a45ed Author: max-orlov <[email protected]> Authored: Tue Nov 21 17:17:57 2017 +0200 Committer: max-orlov <[email protected]> Committed: Tue Nov 21 17:17:57 2017 +0200 ---------------------------------------------------------------------- aria/cli/commands/executions.py | 10 +- aria/orchestrator/execution_compiler.py | 161 ------------------ aria/orchestrator/execution_preparer.py | 170 +++++++++++++++++++ docs/aria.orchestrator.rst | 4 +- .../execution/test_execution_compiler.py | 60 +++---- .../orchestrator/execution_plugin/test_local.py | 3 +- 6 files changed, 208 insertions(+), 200 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/705e81bc/aria/cli/commands/executions.py ---------------------------------------------------------------------- diff --git a/aria/cli/commands/executions.py b/aria/cli/commands/executions.py index de030c6..2415e19 100644 --- a/aria/cli/commands/executions.py +++ b/aria/cli/commands/executions.py @@ -25,7 +25,7 @@ from .. import utils from .. import logger as cli_logger from .. import execution_logging from ..core import aria -from ...orchestrator import execution_compiler +from ...orchestrator import execution_preparer from ...modeling.models import Execution from ...orchestrator.workflows.core.engine import Engine from ...orchestrator.workflows.executor.dry import DryExecutor @@ -145,14 +145,14 @@ def start(workflow_name, service = model_storage.service.get_by_name(service_name) executor = DryExecutor() if dry else ProcessExecutor(plugin_manager=plugin_manager) - compiler = execution_compiler.ExecutionCompiler( + compiler = execution_preparer.ExecutionPreparer( model_storage, resource_storage, plugin_manager, service, workflow_name ) - workflow_ctx = compiler.compile(inputs, executor=executor) + workflow_ctx = compiler.prepare(inputs, executor=executor) engine = Engine(executor) logger.info('Starting {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else '')) @@ -194,13 +194,13 @@ def resume(execution_id, .format(execution=execution_to_resume)) return - workflow_ctx = execution_compiler.ExecutionCompiler( + workflow_ctx = execution_preparer.ExecutionPreparer( model_storage, resource_storage, plugin_manager, execution_to_resume.service, execution_to_resume.workflow_name - ).compile(execution_id=execution_to_resume.id) + ).prepare(execution_id=execution_to_resume.id) engine = Engine(executor) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/705e81bc/aria/orchestrator/execution_compiler.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_compiler.py b/aria/orchestrator/execution_compiler.py deleted file mode 100644 index f86e6b3..0000000 --- a/aria/orchestrator/execution_compiler.py +++ /dev/null @@ -1,161 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import sys -from datetime import datetime - -from . import exceptions -from .context.workflow import WorkflowContext -from .workflows import builtin -from .workflows.core import graph_compiler -from .workflows.executor.process import ProcessExecutor -from ..modeling import models -from ..modeling import utils as modeling_utils -from ..utils.imports import import_fullname - - -DEFAULT_TASK_MAX_ATTEMPTS = 30 -DEFAULT_TASK_RETRY_INTERVAL = 30 - - -class ExecutionCompiler(object): - def __init__( - self, - model, - resource, - plugin, - service, - workflow_name, - task_max_attempts=None, - task_retry_interval=None - ): - self._model = model - self._resource = resource - self._plugin = plugin - self._service = service - self._workflow_name = workflow_name - self._workflow_context = None - self._execution = None - self._task_max_attempts = task_max_attempts or DEFAULT_TASK_MAX_ATTEMPTS - self._task_retry_interval = task_retry_interval or DEFAULT_TASK_RETRY_INTERVAL - - @property - def workflow_ctx(self): - if self._workflow_context is None: - self._workflow_context = WorkflowContext( - name=self.__class__.__name__, - model_storage=self._model, - resource_storage=self._resource, - service_id=self._execution.service.id, - execution_id=self._execution.id, - workflow_name=self._execution.workflow_name, - task_max_attempts=self._task_max_attempts, - task_retry_interval=self._task_retry_interval, - ) - return self._workflow_context - - def compile(self, execution_inputs=None, executor=None, execution_id=None): - assert not (execution_inputs and executor and execution_id) - - if execution_id is None: - # If the execution is new - self._execution = self._create_execution_model(execution_inputs) - self._model.execution.put(self._execution) - self._create_tasks(executor) - self._model.execution.update(self._execution) - else: - # If resuming an execution - self._execution = self._model.execution.get(execution_id) - - return self.workflow_ctx - - def _create_tasks(self, executor=None): - - # Set default executor and kwargs - executor = executor or ProcessExecutor(plugin_manager=self._plugin) - - # transforming the execution inputs to dict, to pass them to the workflow function - execution_inputs_dict = dict(inp.unwrapped for inp in self._execution.inputs.itervalues()) - - if len(self._execution.tasks) == 0: - workflow_fn = self._get_workflow_fn(self._execution.workflow_name) - tasks_graph = workflow_fn(ctx=self.workflow_ctx, **execution_inputs_dict) - compiler = graph_compiler.GraphCompiler(self.workflow_ctx, executor.__class__) - compiler.compile(tasks_graph) - - def _create_execution_model(self, inputs=None): - self._validate_workflow_exists_for_service() - self._validate_no_active_executions() - - execution = models.Execution( - created_at=datetime.utcnow(), - service_fk=self._service.id, - workflow_name=self._workflow_name, - inputs={}) - - if self._workflow_name in builtin.BUILTIN_WORKFLOWS: - workflow_inputs = dict() # built-in workflows don't have any inputs - else: - workflow_inputs = self._service.workflows[self._workflow_name].inputs - - modeling_utils.validate_no_undeclared_inputs(declared_inputs=workflow_inputs, - supplied_inputs=inputs or {}) - modeling_utils.validate_required_inputs_are_supplied(declared_inputs=workflow_inputs, - supplied_inputs=inputs or {}) - execution.inputs = modeling_utils.merge_parameter_values( - inputs, workflow_inputs, model_cls=models.Input) - - return execution - - def _validate_no_active_executions(self): - active_executions = [e for e in self._service.executions if - e.is_active()] - if active_executions: - raise exceptions.ActiveExecutionsError( - "Can't start execution; Service {0} has an active execution with ID {1}" - .format(self._service.name, active_executions[0].id)) - - def _validate_workflow_exists_for_service(self): - if self._workflow_name not in self._service.workflows and \ - self._workflow_name not in builtin.BUILTIN_WORKFLOWS: - raise exceptions.UndeclaredWorkflowError( - 'No workflow policy {0} declared in service {1}' - .format(self._workflow_name, self._service.name)) - - def _get_workflow_fn(self, workflow_name): - if workflow_name in builtin.BUILTIN_WORKFLOWS: - return import_fullname('{0}.{1}'.format(builtin.BUILTIN_WORKFLOWS_PATH_PREFIX, - workflow_name)) - - workflow = self._service.workflows[workflow_name] - - # TODO: Custom workflow support needs improvement, currently this code uses internal - # knowledge of the resource storage; Instead, workflows should probably be loaded - # in a similar manner to operation plugins. Also consider passing to import_fullname - # as paths instead of appending to sys path. - service_template_resources_path = os.path.join( - self._resource.service_template.base_path, - str(self._service.service_template.id)) - sys.path.append(service_template_resources_path) - - try: - workflow_fn = import_fullname(workflow.function) - except ImportError: - raise exceptions.WorkflowImplementationNotFoundError( - 'Could not find workflow {0} function at {1}'.format( - workflow_name, workflow.function)) - - return workflow_fn http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/705e81bc/aria/orchestrator/execution_preparer.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_preparer.py b/aria/orchestrator/execution_preparer.py new file mode 100644 index 0000000..c59ae44 --- /dev/null +++ b/aria/orchestrator/execution_preparer.py @@ -0,0 +1,170 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import sys +from datetime import datetime + +from . import exceptions +from .context.workflow import WorkflowContext +from .workflows import builtin +from .workflows.core import graph_compiler +from .workflows.executor.process import ProcessExecutor +from ..modeling import models +from ..modeling import utils as modeling_utils +from ..utils.imports import import_fullname + + +DEFAULT_TASK_MAX_ATTEMPTS = 30 +DEFAULT_TASK_RETRY_INTERVAL = 30 + + +class ExecutionPreparer(object): + """ + This class manages any execution and tasks related preparation for an execution of a workflow. + """ + def __init__( + self, + model_storage, + resource_storagee, + plugin_manager, + service, + workflow_name, + task_max_attempts=None, + task_retry_interval=None + ): + self._model = model_storage + self._resource = resource_storagee + self._plugin = plugin_manager + self._service = service + self._workflow_name = workflow_name + self._task_max_attempts = task_max_attempts or DEFAULT_TASK_MAX_ATTEMPTS + self._task_retry_interval = task_retry_interval or DEFAULT_TASK_RETRY_INTERVAL + + def get_ctx(self, execution): + return WorkflowContext( + name=self._workflow_name, + model_storage=self._model, + resource_storage=self._resource, + service_id=execution.service.id, + execution_id=execution.id, + workflow_name=execution.workflow_name, + task_max_attempts=self._task_max_attempts, + task_retry_interval=self._task_retry_interval, + ) + + def prepare(self, execution_inputs=None, executor=None, execution_id=None): + """ + Prepares the execution and return the workflow ctx. If the execution is new, an execution + and tasks models would be created. A workflow context for the appropriate execution would + be created. + + :param execution_inputs: Inputs for the execution. + :param executor: the execution for the tasks + :param execution_id: used for an existing execution (mainly for resuming). + :return: + """ + assert not (execution_inputs and executor and execution_id) + + if execution_id is None: + # If the execution is new + execution = self._create_execution_model(execution_inputs) + self._model.execution.put(execution) + ctx = self.get_ctx(execution) + self._create_tasks(ctx, executor) + self._model.execution.update(execution) + else: + # If resuming an execution + execution = self._model.execution.get(execution_id) + ctx = self.get_ctx(execution) + + return ctx + + def _create_tasks(self, ctx, executor=None): + + # Set default executor and kwargs + executor = executor or ProcessExecutor(plugin_manager=self._plugin) + + # transforming the execution inputs to dict, to pass them to the workflow function + execution_inputs_dict = dict(inp.unwrapped for inp in ctx.execution.inputs.itervalues()) + + workflow_fn = self._get_workflow_fn(ctx.execution.workflow_name) + api_tasks_graph = workflow_fn(ctx=ctx, **execution_inputs_dict) + compiler = graph_compiler.GraphCompiler(ctx, executor.__class__) + compiler.compile(api_tasks_graph) + + def _create_execution_model(self, inputs=None): + self._validate_workflow_exists_for_service() + self._validate_no_active_executions() + + execution = models.Execution( + created_at=datetime.utcnow(), + service_fk=self._service.id, + workflow_name=self._workflow_name, + inputs={}) + + if self._workflow_name in builtin.BUILTIN_WORKFLOWS: + workflow_inputs = dict() # built-in workflows don't have any inputs + else: + workflow_inputs = self._service.workflows[self._workflow_name].inputs + + modeling_utils.validate_no_undeclared_inputs(declared_inputs=workflow_inputs, + supplied_inputs=inputs or {}) + modeling_utils.validate_required_inputs_are_supplied(declared_inputs=workflow_inputs, + supplied_inputs=inputs or {}) + execution.inputs = modeling_utils.merge_parameter_values( + inputs, workflow_inputs, model_cls=models.Input) + + return execution + + def _validate_no_active_executions(self): + active_executions = [e for e in self._service.executions if + e.is_active()] + if active_executions: + raise exceptions.ActiveExecutionsError( + "Can't start execution; Service {0} has an active execution with ID {1}" + .format(self._service.name, active_executions[0].id)) + + def _validate_workflow_exists_for_service(self): + if self._workflow_name not in self._service.workflows and \ + self._workflow_name not in builtin.BUILTIN_WORKFLOWS: + raise exceptions.UndeclaredWorkflowError( + 'No workflow policy {0} declared in service {1}' + .format(self._workflow_name, self._service.name)) + + def _get_workflow_fn(self, workflow_name): + if workflow_name in builtin.BUILTIN_WORKFLOWS: + return import_fullname('{0}.{1}'.format(builtin.BUILTIN_WORKFLOWS_PATH_PREFIX, + workflow_name)) + + workflow = self._service.workflows[workflow_name] + + # TODO: Custom workflow support needs improvement, currently this code uses internal + # knowledge of the resource storage; Instead, workflows should probably be loaded + # in a similar manner to operation plugins. Also consider passing to import_fullname + # as paths instead of appending to sys path. + service_template_resources_path = os.path.join( + self._resource.service_template.base_path, + str(self._service.service_template.id)) + sys.path.append(service_template_resources_path) + + try: + workflow_fn = import_fullname(workflow.function) + except ImportError: + raise exceptions.WorkflowImplementationNotFoundError( + 'Could not find workflow {0} function at {1}'.format( + workflow_name, workflow.function)) + + return workflow_fn http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/705e81bc/docs/aria.orchestrator.rst ---------------------------------------------------------------------- diff --git a/docs/aria.orchestrator.rst b/docs/aria.orchestrator.rst index 5d7eda6..6e6d659 100644 --- a/docs/aria.orchestrator.rst +++ b/docs/aria.orchestrator.rst @@ -40,7 +40,7 @@ .. automodule:: aria.orchestrator.plugin -:mod:`aria.orchestrator.execution_compiler` +:mod:`aria.orchestrator.execution_preparer` ------------------------------------------- -.. automodule:: aria.orchestrator.execution_compiler +.. automodule:: aria.orchestrator.execution_preparer http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/705e81bc/tests/orchestrator/execution/test_execution_compiler.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution/test_execution_compiler.py b/tests/orchestrator/execution/test_execution_compiler.py index 6062686..42e5272 100644 --- a/tests/orchestrator/execution/test_execution_compiler.py +++ b/tests/orchestrator/execution/test_execution_compiler.py @@ -24,7 +24,7 @@ from aria.modeling import exceptions as modeling_exceptions from aria.modeling import models from aria.orchestrator import exceptions from aria.orchestrator import events -from aria.orchestrator import execution_compiler +from aria.orchestrator import execution_preparer from aria.orchestrator.workflows import api from aria.orchestrator.workflows.core import engine, graph_compiler from aria.orchestrator.workflows.executor import thread @@ -64,7 +64,7 @@ class FailingTask(BaseException): def test_undeclared_workflow(request): # validating a proper error is raised when the workflow is not declared in the service with pytest.raises(exceptions.UndeclaredWorkflowError): - _get_compiler(request, 'undeclared_workflow').compile() + _get_preparer(request, 'undeclared_workflow').prepare() def test_missing_workflow_implementation(service, request): @@ -76,13 +76,13 @@ def test_missing_workflow_implementation(service, request): service.workflows['test_workflow'] = workflow with pytest.raises(exceptions.WorkflowImplementationNotFoundError): - _get_compiler(request, 'test_workflow').compile() + _get_preparer(request, 'test_workflow').prepare() -def test_builtin_workflow_instantiation(request, model): +def test_builtin_workflow_instantiation(request): # validates the workflow runner instantiates properly when provided with a builtin workflow # (expecting no errors to be raised on undeclared workflow or missing workflow implementation) - workflow_ctx = _get_compiler(request, 'install').compile() + workflow_ctx = _get_preparer(request, 'install').prepare() assert len(workflow_ctx.execution.tasks) == 18 # expecting 18 tasks for 2 node topology @@ -90,7 +90,7 @@ def test_custom_workflow_instantiation(request): # validates the workflow runner instantiates properly when provided with a custom workflow # (expecting no errors to be raised on undeclared workflow or missing workflow implementation) mock_workflow = _setup_mock_workflow_in_service(request) - workflow_ctx = _get_compiler(request, mock_workflow).compile() + workflow_ctx = _get_preparer(request, mock_workflow).prepare() assert len(workflow_ctx.execution.tasks) == 2 # mock workflow creates only start workflow # and end workflow task @@ -102,7 +102,7 @@ def test_existing_active_executions(request, service, model): workflow_name='uninstall') model.execution.put(existing_active_execution) with pytest.raises(exceptions.ActiveExecutionsError): - _get_compiler(request, 'install').compile() + _get_preparer(request, 'install').prepare() def test_existing_executions_but_no_active_ones(request, service, model): @@ -112,13 +112,13 @@ def test_existing_executions_but_no_active_ones(request, service, model): workflow_name='uninstall') model.execution.put(existing_terminated_execution) # no active executions exist, so no error should be raised - _get_compiler(request, 'install').compile() + _get_preparer(request, 'install').prepare() def test_execution_model_creation(request, service): mock_workflow = _setup_mock_workflow_in_service(request) - workflow_ctx = _get_compiler(request, mock_workflow).compile() + workflow_ctx = _get_preparer(request, mock_workflow).prepare() assert workflow_ctx.execution.service.id == service.id assert workflow_ctx.execution.workflow_name == mock_workflow @@ -133,7 +133,7 @@ def test_execution_inputs_override_workflow_inputs(request): inputs=dict((name, models.Input.wrap(name, val)) for name, val in wf_inputs.iteritems())) - workflow_ctx = _get_compiler(request, mock_workflow).compile( + workflow_ctx = _get_preparer(request, mock_workflow).prepare( execution_inputs={'input2': 'overriding-value2', 'input3': 7} ) @@ -150,7 +150,7 @@ def test_execution_inputs_undeclared_inputs(request): mock_workflow = _setup_mock_workflow_in_service(request) with pytest.raises(modeling_exceptions.UndeclaredInputsException): - _get_compiler(request, mock_workflow).compile( + _get_preparer(request, mock_workflow).prepare( execution_inputs={'undeclared_input': 'value'}) @@ -159,7 +159,7 @@ def test_execution_inputs_missing_required_inputs(request): request, inputs={'required_input': models.Input.wrap('required_input', value=None)}) with pytest.raises(modeling_exceptions.MissingRequiredInputsException): - _get_compiler(request, mock_workflow).compile(execution_inputs={}) + _get_preparer(request, mock_workflow).prepare(execution_inputs={}) def test_execution_inputs_wrong_type_inputs(request): @@ -167,13 +167,13 @@ def test_execution_inputs_wrong_type_inputs(request): request, inputs={'input': models.Input.wrap('input', 'value')}) with pytest.raises(modeling_exceptions.ParametersOfWrongTypeException): - _get_compiler(request, mock_workflow).compile(execution_inputs={'input': 5}) + _get_preparer(request, mock_workflow).prepare(execution_inputs={'input': 5}) def test_execution_inputs_builtin_workflow_with_inputs(request): # built-in workflows don't have inputs with pytest.raises(modeling_exceptions.UndeclaredInputsException): - _get_compiler(request, 'install').compile(execution_inputs={'undeclared_input': 'value'}) + _get_preparer(request, 'install').prepare(execution_inputs={'undeclared_input': 'value'}) def test_workflow_function_parameters(request, tmpdir): @@ -188,7 +188,7 @@ def test_workflow_function_parameters(request, tmpdir): request, inputs=dict((name, models.Input.wrap(name, val)) for name, val in wf_inputs.iteritems())) - _get_compiler(request, mock_workflow).compile( + _get_preparer(request, mock_workflow).prepare( execution_inputs={'input2': 'overriding-value2', 'input3': 7}) with open(output_path) as f: @@ -230,14 +230,14 @@ def _setup_mock_workflow_in_service(request, inputs=None): return mock_workflow_name -def _get_compiler(request, workflow_name): +def _get_preparer(request, workflow_name): # helper method for instantiating a workflow runner service = request.getfixturevalue('service') model = request.getfixturevalue('model') resource = request.getfixturevalue('resource') plugin_manager = request.getfixturevalue('plugin_manager') - return execution_compiler.ExecutionCompiler( + return execution_preparer.ExecutionPreparer( model, resource, plugin_manager, @@ -248,7 +248,7 @@ def _get_compiler(request, workflow_name): class TestResumableWorkflows(object): - def _compile_execution( + def _prepare_execution_and_get_workflow_ctx( self, model, resource, @@ -265,16 +265,16 @@ class TestResumableWorkflows(object): } ) model.service.update(service) - compiler = execution_compiler.ExecutionCompiler( + compiler = execution_preparer.ExecutionPreparer( model, resource, None, service, 'custom_workflow' ) - ctx = compiler.compile(inputs, executor) + ctx = compiler.prepare(inputs, executor) model.execution.update(ctx.execution) return ctx @staticmethod - def _wait_for_active_and_cancel(eng, ctx): + def _cancel_active_execution(eng, ctx): if custom_events['is_active'].wait(60) is False: raise TimeoutError("is_active wasn't set to True") eng.cancel_execution(ctx) @@ -285,7 +285,7 @@ class TestResumableWorkflows(object): node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) node.attributes['invocations'] = models.Attribute.wrap('invocations', 0) self._create_interface(workflow_context, node, mock_pass_first_task_only) - ctx = self._compile_execution( + ctx = self._prepare_execution_and_get_workflow_ctx( workflow_context.model, workflow_context.resource, workflow_context.model.service.list()[0], @@ -301,7 +301,7 @@ class TestResumableWorkflows(object): wf_thread.start() # Wait for the execution to start - self._wait_for_active_and_cancel(eng, ctx) + self._cancel_active_execution(eng, ctx) node = ctx.model.node.refresh(node) tasks = ctx.model.task.list(filters={'_stub_type': None}) @@ -310,7 +310,7 @@ class TestResumableWorkflows(object): custom_events['is_resumed'].set() assert any(task.status == task.RETRYING for task in tasks) - # Create a new workflow runner, with an existing execution id. This would cause + # Create a new workflow engine, with an existing execution id. This would cause # the old execution to restart. new_engine = engine.Engine(thread_executor) new_engine.execute(ctx, resuming=True) @@ -326,7 +326,7 @@ class TestResumableWorkflows(object): node.attributes['invocations'] = models.Attribute.wrap('invocations', 0) self._create_interface(workflow_context, node, mock_stuck_task) - ctx = self._compile_execution( + ctx = self._prepare_execution_and_get_workflow_ctx( workflow_context.model, workflow_context.resource, workflow_context.model.service.list()[0], @@ -340,7 +340,7 @@ class TestResumableWorkflows(object): wf_thread.daemon = True wf_thread.start() - self._wait_for_active_and_cancel(eng, ctx) + self._cancel_active_execution(eng, ctx) node = workflow_context.model.node.refresh(node) task = workflow_context.model.task.list(filters={'_stub_type': None})[0] assert node.attributes['invocations'].value == 1 @@ -366,7 +366,7 @@ class TestResumableWorkflows(object): node.attributes['invocations'] = models.Attribute.wrap('invocations', 0) self._create_interface(workflow_context, node, mock_failed_before_resuming) - ctx = self._compile_execution( + ctx = self._prepare_execution_and_get_workflow_ctx( workflow_context.model, workflow_context.resource, workflow_context.model.service.list()[0], @@ -378,7 +378,7 @@ class TestResumableWorkflows(object): wf_thread.setDaemon(True) wf_thread.start() - self._wait_for_active_and_cancel(eng, ctx) + self._cancel_active_execution(eng, ctx) node = workflow_context.model.node.refresh(node) task = workflow_context.model.task.list(filters={'_stub_type': None})[0] @@ -409,7 +409,7 @@ class TestResumableWorkflows(object): node.attributes['invocations'] = models.Attribute.wrap('invocations', 0) self._create_interface(workflow_context, node, mock_pass_first_task_only) - ctx = self._compile_execution( + ctx = self._prepare_execution_and_get_workflow_ctx( workflow_context.model, workflow_context.resource, workflow_context.model.service.list()[0], @@ -457,7 +457,7 @@ class TestResumableWorkflows(object): node.attributes['invocations'] = models.Attribute.wrap('invocations', 0) self._create_interface(workflow_context, node, mock_fail_first_task_only) - ctx = self._compile_execution( + ctx = self._prepare_execution_and_get_workflow_ctx( workflow_context.model, workflow_context.resource, workflow_context.model.service.list()[0], http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/705e81bc/tests/orchestrator/execution_plugin/test_local.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py index fad05de..5af6a2f 100644 --- a/tests/orchestrator/execution_plugin/test_local.py +++ b/tests/orchestrator/execution_plugin/test_local.py @@ -500,8 +500,7 @@ if __name__ == '__main__': arguments=arguments)) return graph tasks_graph = mock_workflow(ctx=workflow_context) # pylint: disable=no-value-for-parameter - graph_compiler.GraphCompiler(workflow_context, executor.__class__).compile( - tasks_graph) + graph_compiler.GraphCompiler(workflow_context, executor.__class__).compile(tasks_graph) eng = engine.Engine(executor) eng.execute(workflow_context) return workflow_context.model.node.get_by_name(
