Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-408-Remove-execution-creation-from-WorkflowRunner [created] 4cf88f3fa
final touches Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/4cf88f3f Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/4cf88f3f Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/4cf88f3f Branch: refs/heads/ARIA-408-Remove-execution-creation-from-WorkflowRunner Commit: 4cf88f3fa7281b36fdc0d2022a00c6bdf5699e81 Parents: 86c1c1a Author: max-orlov <[email protected]> Authored: Mon Nov 20 16:08:17 2017 +0200 Committer: max-orlov <[email protected]> Committed: Mon Nov 20 16:08:17 2017 +0200 ---------------------------------------------------------------------- aria/cli/commands/executions.py | 74 +++++--- aria/orchestrator/execution/__init__.py | 17 -- aria/orchestrator/execution/compiler.py | 149 --------------- aria/orchestrator/execution/runner.py | 50 ----- aria/orchestrator/execution_compiler.py | 161 ++++++++++++++++ aria/orchestrator/workflows/core/engine.py | 4 +- tests/orchestrator/context/__init__.py | 2 +- tests/orchestrator/context/test_serialize.py | 2 +- .../execution/test_execution_compiler.py | 189 ++++++------------- .../orchestrator/execution_plugin/test_local.py | 2 +- tests/orchestrator/execution_plugin/test_ssh.py | 2 +- .../orchestrator/workflows/core/test_engine.py | 2 +- .../orchestrator/workflows/core/test_events.py | 2 +- .../executor/test_process_executor_extension.py | 2 +- .../test_process_executor_tracked_changes.py | 2 +- 15 files changed, 276 insertions(+), 384 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4cf88f3f/aria/cli/commands/executions.py ---------------------------------------------------------------------- diff --git a/aria/cli/commands/executions.py b/aria/cli/commands/executions.py index 162abfa..61328a1 100644 --- a/aria/cli/commands/executions.py +++ b/aria/cli/commands/executions.py @@ -25,8 +25,9 @@ from .. import utils from .. import logger as cli_logger from .. import execution_logging from ..core import aria +from ...orchestrator import execution_compiler from ...modeling.models import Execution -from ...orchestrator import execution +from ...orchestrator.workflows.core.engine import Engine from ...orchestrator.workflows.executor.dry import DryExecutor from ...utils import formatting from ...utils import threading @@ -143,19 +144,19 @@ def start(workflow_name, service = model_storage.service.get_by_name(service_name) executor = DryExecutor() if dry else None # use WorkflowRunner's default executor - new_execution = execution.ExecutionCompiler( + compiler = execution_compiler.ExecutionCompiler( model_storage, resource_storage, plugin_manager, service, workflow_name - ).compile(inputs, executor=executor) - model_storage.execution.put(new_execution) + ) + workflow_ctx = compiler.compile(inputs, executor=executor) - execution_runner = execution.ExecutionRunner(executor=executor) + engine = Engine(executor) logger.info('Starting {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else '')) - _run_execution(execution_runner, logger, model_storage, dry, mark_pattern) + _run_execution(engine, workflow_ctx, logger, model_storage, dry, mark_pattern) @executions.command(name='resume', @@ -188,35 +189,53 @@ def resume(execution_id, if execution_to_resume.status != execution_to_resume.CANCELLED: logger.info("Can't resume execution {execution.id} - " "execution is in status {execution.status}. " - "Can only resume executions in status {execution_to_resume.CANCELLED}" + "Can only resume executions in status {execution.CANCELLED}" .format(execution=execution_to_resume)) return - - execution_runner = execution.ExecutionRunner(executor, True, retry_failed_tasks) - logger.info('Resuming {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else '')) - _run_execution(execution_runner, logger, model_storage, dry, mark_pattern) + workflow_ctx = execution_compiler.ExecutionCompiler( + model_storage, + resource_storage, + plugin_manager, + execution_to_resume.service, + execution_to_resume.workflow_name + ).compile(execution_id=execution_to_resume.id) + engine = Engine(executor) -def _run_execution(workflow_runner, logger, model_storage, dry, mark_pattern): - execution_thread_name = '{0}_{1}'.format(workflow_runner.service.name, - workflow_runner.execution.workflow_name) - execution_thread = threading.ExceptionThread(target=workflow_runner.execute, - name=execution_thread_name) + logger.info('Resuming {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else '')) + _run_execution(engine, workflow_ctx, logger, model_storage, dry, mark_pattern, + engine_kwargs=dict(resuming=True, retry_failed=retry_failed_tasks)) + + +def _run_execution( + engine, + ctx, + logger, + model_storage, + dry, + mark_pattern, + engine_kwargs=None +): + engine_kwargs = engine_kwargs or {} + engine_kwargs['ctx'] = ctx + execution_thread_name = '{0}_{1}'.format(ctx.execution.service.name, + ctx.execution.workflow_name) + execution_thread = threading.ExceptionThread(target=engine.execute, + name=execution_thread_name, + **engine_kwargs) execution_thread.start() - last_task_id = workflow_runner.execution.logs[-1].id if workflow_runner.execution.logs else 0 - log_iterator = cli_logger.ModelLogIterator(model_storage, - workflow_runner.execution_id, - offset=last_task_id) + last_task_id = ctx.execution.logs[-1].id if ctx.execution.logs else 0 + log_iterator = cli_logger.ModelLogIterator(model_storage, ctx.execution.id, offset=last_task_id) try: while execution_thread.is_alive(): execution_logging.log_list(log_iterator, mark_pattern=mark_pattern) execution_thread.join(1) except KeyboardInterrupt: - _cancel_execution(workflow_runner, execution_thread, logger, log_iterator) + _cancel_execution(engine, ctx, execution_thread, logger, log_iterator) # It might be the case where some logs were written and the execution was terminated, thus we # need to drain the remaining logs. @@ -225,19 +244,18 @@ def _run_execution(workflow_runner, logger, model_storage, dry, mark_pattern): # raise any errors from the execution thread (note these are not workflow execution errors) execution_thread.raise_error_if_exists() - execution = workflow_runner.execution - logger.info('Execution has ended with "{0}" status'.format(execution.status)) - if execution.status == Execution.FAILED and execution.error: - logger.info('Execution error:{0}{1}'.format(os.linesep, execution.error)) + logger.info('Execution has ended with "{0}" status'.format(ctx.execution.status)) + if ctx.execution.status == Execution.FAILED and ctx.execution.error: + logger.info('Execution error:{0}{1}'.format(os.linesep, ctx.execution.error)) if dry: # remove traces of the dry execution (including tasks, logs, inputs..) - model_storage.execution.delete(execution) + model_storage.execution.delete(ctx.execution) -def _cancel_execution(workflow_runner, execution_thread, logger, log_iterator): +def _cancel_execution(engine, ctx, execution_thread, logger, log_iterator): logger.info('Cancelling execution. Press Ctrl+C again to force-cancel.') - workflow_runner.cancel() + engine.cancel_execution(ctx) while execution_thread.is_alive(): try: execution_logging.log_list(log_iterator) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4cf88f3f/aria/orchestrator/execution/__init__.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution/__init__.py b/aria/orchestrator/execution/__init__.py deleted file mode 100644 index ef17fde..0000000 --- a/aria/orchestrator/execution/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from .compiler import ExecutionCompiler -from .runner import ExecutionRunner \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4cf88f3f/aria/orchestrator/execution/compiler.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution/compiler.py b/aria/orchestrator/execution/compiler.py deleted file mode 100644 index f3866bf..0000000 --- a/aria/orchestrator/execution/compiler.py +++ /dev/null @@ -1,149 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import sys -from datetime import datetime - -from .. import exceptions -from ..context.workflow import WorkflowContext -from ..workflows import builtin -from ..workflows.core import graph_compiler -from ..workflows.executor.process import ProcessExecutor -from ...modeling import models -from ...modeling import utils as modeling_utils -from ...utils.imports import import_fullname - - -DEFAULT_TASK_MAX_ATTEMPTS = 30 -DEFAULT_TASK_RETRY_INTERVAL = 30 - - -class ExecutionCompiler(object): - def __init__(self, model, resource, plugin, service, workflow_name): - self._model = model - self._resource = resource - self._plugin = plugin - self._service = service - self._workflow_name = workflow_name - self._workflow_context = None - - @property - def workflow_ctx(self): - return self._workflow_context - - def compile( - self, - execution_inputs=None, - executor=None, - task_max_attempts=None, - task_retry_interval=None): - - execution = self._create_execution_model(execution_inputs) - self._model.execution.put(execution) - self._set_ctx(execution, task_max_attempts, task_retry_interval) - self._create_tasks(execution, executor=executor) - self._model.execution.update(execution) - return execution - - def _set_ctx(self, execution, task_max_attempts=None, task_retry_interval=None): - self._workflow_context = WorkflowContext( - name=self.__class__.__name__, - model_storage=self._model, - resource_storage=self._resource, - service_id=execution.service.id, - execution_id=execution.id, - workflow_name=execution.workflow_name, - task_max_attempts=task_max_attempts or DEFAULT_TASK_MAX_ATTEMPTS, - task_retry_interval=task_retry_interval or DEFAULT_TASK_RETRY_INTERVAL - ) - - def _create_tasks(self, execution, executor=None): - - # Set default executor and kwargs - executor = executor or ProcessExecutor(plugin_manager=self._plugin) - - # transforming the execution inputs to dict, to pass them to the workflow function - execution_inputs_dict = dict(inp.unwrapped for inp in execution.inputs.itervalues()) - - if len(execution.tasks) == 0: - workflow_fn = self._get_workflow_fn(execution.workflow_name) - self._tasks_graph = workflow_fn(ctx=self.workflow_ctx, **execution_inputs_dict) - compiler = graph_compiler.GraphCompiler(self.workflow_ctx, executor.__class__) - compiler.compile(self._tasks_graph) - - def _create_execution_model(self, inputs=None): - self._validate_workflow_exists_for_service() - self._validate_no_active_executions() - - execution = models.Execution( - created_at=datetime.utcnow(), - service_fk=self._service.id, - workflow_name=self._workflow_name, - inputs={}) - - if self._workflow_name in builtin.BUILTIN_WORKFLOWS: - workflow_inputs = dict() # built-in workflows don't have any inputs - else: - workflow_inputs = self._service.workflows[self._workflow_name].inputs - - modeling_utils.validate_no_undeclared_inputs(declared_inputs=workflow_inputs, - supplied_inputs=inputs or {}) - modeling_utils.validate_required_inputs_are_supplied(declared_inputs=workflow_inputs, - supplied_inputs=inputs or {}) - execution.inputs = modeling_utils.merge_parameter_values( - inputs, workflow_inputs, model_cls=models.Input) - - return execution - - def _validate_no_active_executions(self): - active_executions = [e for e in self._service.executions if - e.is_active()] - if active_executions: - raise exceptions.ActiveExecutionsError( - "Can't start execution; Service {0} has an active execution with ID {1}" - .format(self._service.name, active_executions[0].id)) - - def _validate_workflow_exists_for_service(self): - if self._workflow_name not in self._service.workflows and \ - self._workflow_name not in builtin.BUILTIN_WORKFLOWS: - raise exceptions.UndeclaredWorkflowError( - 'No workflow policy {0} declared in service {1}' - .format(self._workflow_name, self._service.name)) - - def _get_workflow_fn(self, workflow_name): - if workflow_name in builtin.BUILTIN_WORKFLOWS: - return import_fullname('{0}.{1}'.format(builtin.BUILTIN_WORKFLOWS_PATH_PREFIX, - workflow_name)) - - workflow = self._service.workflows[workflow_name] - - # TODO: Custom workflow support needs improvement, currently this code uses internal - # knowledge of the resource storage; Instead, workflows should probably be loaded - # in a similar manner to operation plugins. Also consider passing to import_fullname - # as paths instead of appending to sys path. - service_template_resources_path = os.path.join( - self._resource.service_template.base_path, - str(self._service.service_template.id)) - sys.path.append(service_template_resources_path) - - try: - workflow_fn = import_fullname(workflow.function) - except ImportError: - raise exceptions.WorkflowImplementationNotFoundError( - 'Could not find workflow {0} function at {1}'.format( - workflow_name, workflow.function)) - - return workflow_fn http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4cf88f3f/aria/orchestrator/execution/runner.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution/runner.py b/aria/orchestrator/execution/runner.py deleted file mode 100644 index a532901..0000000 --- a/aria/orchestrator/execution/runner.py +++ /dev/null @@ -1,50 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Running workflows. -""" - -from ..workflows.core import engine - - -class ExecutionRunner(object): - - def __init__(self, executor, resume=False, retry_failed_tasks=False): - """ - Manages a single workflow execution on a given service. - - :param workflow_name: workflow name - :param service_id: service ID - :param inputs: key-value dict of inputs for the execution - :param model_storage: model storage API ("MAPI") - :param resource_storage: resource storage API ("RAPI") - :param plugin_manager: plugin manager - :param executor: executor for tasks; defaults to a - :class:`~aria.orchestrator.workflows.executor.process.ProcessExecutor` instance - :param task_max_attempts: maximum attempts of repeating each failing task - :param task_retry_interval: retry interval between retry attempts of a failing task - """ - - self._is_resume = resume - self._retry_failed_tasks = retry_failed_tasks - self._engine = engine.Engine(executors={executor.__class__: executor}) - - def execute(self, ctx): - self._engine.execute( - ctx=ctx, resuming=self._is_resume, retry_failed=self._retry_failed_tasks) - - def cancel(self, ctx): - self._engine.cancel_execution(ctx) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4cf88f3f/aria/orchestrator/execution_compiler.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_compiler.py b/aria/orchestrator/execution_compiler.py new file mode 100644 index 0000000..01e35c1 --- /dev/null +++ b/aria/orchestrator/execution_compiler.py @@ -0,0 +1,161 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import sys +from datetime import datetime + +from . import exceptions +from .context.workflow import WorkflowContext +from .workflows import builtin +from .workflows.core import graph_compiler +from .workflows.executor.process import ProcessExecutor +from ..modeling import models +from ..modeling import utils as modeling_utils +from ..utils.imports import import_fullname + + +DEFAULT_TASK_MAX_ATTEMPTS = 30 +DEFAULT_TASK_RETRY_INTERVAL = 30 + + +class ExecutionCompiler(object): + def __init__( + self, + model, + resource, + plugin, + service, + workflow_name, + task_max_attempts=None, + task_retry_interval=None + ): + self._model = model + self._resource = resource + self._plugin = plugin + self._service = service + self._workflow_name = workflow_name + self._workflow_context = None + self._execution = None + self._task_max_attempts = task_max_attempts or DEFAULT_TASK_MAX_ATTEMPTS + self._task_retry_interval = task_retry_interval or DEFAULT_TASK_RETRY_INTERVAL + + @property + def workflow_ctx(self): + if self._workflow_context is None: + self._workflow_context = WorkflowContext( + name=self.__class__.__name__, + model_storage=self._model, + resource_storage=self._resource, + service_id=self._execution.service.id, + execution_id=self._execution.id, + workflow_name=self._execution.workflow_name, + task_max_attempts=self._task_max_attempts, + task_retry_interval=self._task_retry_interval, + ) + return self._workflow_context + + def compile(self, execution_inputs=None, executor=None, execution_id=None): + assert not (execution_inputs and executor and execution_id) + + if execution_id is None: + # If the execution is new + self._execution = self._create_execution_model(execution_inputs) + self._model.execution.put(self._execution) + self._create_tasks(executor) + self._model.execution.update(self._execution) + else: + # If resuming an execution + self._execution = self._model.execution.get(execution_id) + + return self.workflow_ctx + + def _create_tasks(self, executor=None): + + # Set default executor and kwargs + executor = executor or ProcessExecutor(plugin_manager=self._plugin) + + # transforming the execution inputs to dict, to pass them to the workflow function + execution_inputs_dict = dict(inp.unwrapped for inp in self._execution.inputs.itervalues()) + + if len(self._execution.tasks) == 0: + workflow_fn = self._get_workflow_fn(self._execution.workflow_name) + self._tasks_graph = workflow_fn(ctx=self.workflow_ctx, **execution_inputs_dict) + compiler = graph_compiler.GraphCompiler(self.workflow_ctx, executor.__class__) + compiler.compile(self._tasks_graph) + + def _create_execution_model(self, inputs=None): + self._validate_workflow_exists_for_service() + self._validate_no_active_executions() + + execution = models.Execution( + created_at=datetime.utcnow(), + service_fk=self._service.id, + workflow_name=self._workflow_name, + inputs={}) + + if self._workflow_name in builtin.BUILTIN_WORKFLOWS: + workflow_inputs = dict() # built-in workflows don't have any inputs + else: + workflow_inputs = self._service.workflows[self._workflow_name].inputs + + modeling_utils.validate_no_undeclared_inputs(declared_inputs=workflow_inputs, + supplied_inputs=inputs or {}) + modeling_utils.validate_required_inputs_are_supplied(declared_inputs=workflow_inputs, + supplied_inputs=inputs or {}) + execution.inputs = modeling_utils.merge_parameter_values( + inputs, workflow_inputs, model_cls=models.Input) + + return execution + + def _validate_no_active_executions(self): + active_executions = [e for e in self._service.executions if + e.is_active()] + if active_executions: + raise exceptions.ActiveExecutionsError( + "Can't start execution; Service {0} has an active execution with ID {1}" + .format(self._service.name, active_executions[0].id)) + + def _validate_workflow_exists_for_service(self): + if self._workflow_name not in self._service.workflows and \ + self._workflow_name not in builtin.BUILTIN_WORKFLOWS: + raise exceptions.UndeclaredWorkflowError( + 'No workflow policy {0} declared in service {1}' + .format(self._workflow_name, self._service.name)) + + def _get_workflow_fn(self, workflow_name): + if workflow_name in builtin.BUILTIN_WORKFLOWS: + return import_fullname('{0}.{1}'.format(builtin.BUILTIN_WORKFLOWS_PATH_PREFIX, + workflow_name)) + + workflow = self._service.workflows[workflow_name] + + # TODO: Custom workflow support needs improvement, currently this code uses internal + # knowledge of the resource storage; Instead, workflows should probably be loaded + # in a similar manner to operation plugins. Also consider passing to import_fullname + # as paths instead of appending to sys path. + service_template_resources_path = os.path.join( + self._resource.service_template.base_path, + str(self._service.service_template.id)) + sys.path.append(service_template_resources_path) + + try: + workflow_fn = import_fullname(workflow.function) + except ImportError: + raise exceptions.WorkflowImplementationNotFoundError( + 'Could not find workflow {0} function at {1}'.format( + workflow_name, workflow.function)) + + return workflow_fn http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4cf88f3f/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py index 0ec3cd8..0d7d2ae 100644 --- a/aria/orchestrator/workflows/core/engine.py +++ b/aria/orchestrator/workflows/core/engine.py @@ -36,9 +36,9 @@ class Engine(logger.LoggerMixin): Executes workflows. """ - def __init__(self, executors, **kwargs): + def __init__(self, *executors, **kwargs): super(Engine, self).__init__(**kwargs) - self._executors = executors.copy() + self._executors = dict((e.__class__, e) for e in executors) self._executors.setdefault(StubTaskExecutor, StubTaskExecutor()) def execute(self, ctx, resuming=False, retry_failed=False): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4cf88f3f/tests/orchestrator/context/__init__.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/__init__.py b/tests/orchestrator/context/__init__.py index 780db07..257cbf7 100644 --- a/tests/orchestrator/context/__init__.py +++ b/tests/orchestrator/context/__init__.py @@ -27,6 +27,6 @@ def execute(workflow_func, workflow_context, executor): graph = workflow_func(ctx=workflow_context) graph_compiler.GraphCompiler(workflow_context, executor.__class__).compile(graph) - eng = engine.Engine(executors={executor.__class__: executor}) + eng = engine.Engine(executor) eng.execute(workflow_context) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4cf88f3f/tests/orchestrator/context/test_serialize.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_serialize.py b/tests/orchestrator/context/test_serialize.py index 091e23c..6e9c950 100644 --- a/tests/orchestrator/context/test_serialize.py +++ b/tests/orchestrator/context/test_serialize.py @@ -49,7 +49,7 @@ def test_serialize_operation_context(context, executor, tmpdir): graph = _mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter graph_compiler.GraphCompiler(context, executor.__class__).compile(graph) - eng = engine.Engine({executor.__class__: executor}) + eng = engine.Engine(executor) eng.execute(context) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4cf88f3f/tests/orchestrator/execution/test_execution_compiler.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution/test_execution_compiler.py b/tests/orchestrator/execution/test_execution_compiler.py index d8c8aa3..14332db 100644 --- a/tests/orchestrator/execution/test_execution_compiler.py +++ b/tests/orchestrator/execution/test_execution_compiler.py @@ -18,15 +18,13 @@ import time from threading import Thread, Event from datetime import datetime -import mock import pytest from aria.modeling import exceptions as modeling_exceptions from aria.modeling import models from aria.orchestrator import exceptions from aria.orchestrator import events -from aria.orchestrator import execution as orch_execution -from aria.orchestrator.workflows.executor.process import ProcessExecutor +from aria.orchestrator import execution_compiler from aria.orchestrator.workflows import api from aria.orchestrator.workflows.core import engine, graph_compiler from aria.orchestrator.workflows.executor import thread @@ -84,16 +82,16 @@ def test_missing_workflow_implementation(service, request): def test_builtin_workflow_instantiation(request, model): # validates the workflow runner instantiates properly when provided with a builtin workflow # (expecting no errors to be raised on undeclared workflow or missing workflow implementation) - execution = _get_compiler(request, 'install').compile() - assert len(execution.tasks) == 18 # expecting 18 tasks for 2 node topology + workflow_ctx = _get_compiler(request, 'install').compile() + assert len(workflow_ctx.execution.tasks) == 18 # expecting 18 tasks for 2 node topology def test_custom_workflow_instantiation(request): # validates the workflow runner instantiates properly when provided with a custom workflow # (expecting no errors to be raised on undeclared workflow or missing workflow implementation) mock_workflow = _setup_mock_workflow_in_service(request) - execution = _get_compiler(request, mock_workflow).compile() - assert len(execution.tasks) == 2 # mock workflow creates only start workflow and end workflow task + workflow_ctx = _get_compiler(request, mock_workflow).compile() + assert len(workflow_ctx.execution.tasks) == 2 # mock workflow creates only start workflow and end workflow task def test_existing_active_executions(request, service, model): @@ -116,88 +114,15 @@ def test_existing_executions_but_no_active_ones(request, service, model): _get_compiler(request, 'install').compile() -def test_default_executor(request): - # validates the ProcessExecutor is used by the workflow runner by default - mock_workflow = _setup_mock_workflow_in_service(request) - - with mock.patch('aria.orchestrator.execution.runner.engine.Engine') as mock_engine_cls: - execution = _get_compiler(request, mock_workflow).compile() - orch_execution.ExecutionRunner(ProcessExecutor()) - _, engine_kwargs = mock_engine_cls.call_args - assert isinstance(engine_kwargs.get('executors').values()[0], ProcessExecutor) - - -def test_custom_executor(request): - mock_workflow = _setup_mock_workflow_in_service(request) - - custom_executor = mock.MagicMock() - with mock.patch('aria.orchestrator.execution.runner.engine.Engine') as mock_engine_cls: - execution = _get_compiler(request, mock_workflow).compile(executor=custom_executor) - orch_execution.ExecutionRunner(custom_executor) - _, engine_kwargs = mock_engine_cls.call_args - assert engine_kwargs.get('executors').values()[0] == custom_executor - - -def test_task_configuration_parameters(request): - mock_workflow = _setup_mock_workflow_in_service(request) - - task_max_attempts = 5 - task_retry_interval = 7 - with mock.patch('aria.orchestrator.execution.runner.engine.Engine.execute') as \ - mock_engine_execute: - compiler = _get_compiler(request, mock_workflow) - execution = compiler.compile(task_max_attempts=task_max_attempts, - task_retry_interval=task_retry_interval) - orch_execution.ExecutionRunner(ProcessExecutor()).execute(compiler.workflow_ctx) - _, engine_kwargs = mock_engine_execute.call_args - assert engine_kwargs['ctx']._task_max_attempts == task_max_attempts - assert engine_kwargs['ctx']._task_retry_interval == task_retry_interval - - -def test_execute(request, service): - mock_workflow = _setup_mock_workflow_in_service(request) - - mock_engine = mock.MagicMock() - with mock.patch('aria.orchestrator.execution.runner.engine.Engine.execute', - return_value=mock_engine) as mock_engine_execute: - compiler = _get_compiler(request, mock_workflow) - compiler.compile() - - runner = orch_execution.runner.ExecutionRunner(ProcessExecutor()) - runner.execute(compiler.workflow_ctx) - - _, engine_kwargs = mock_engine_execute.call_args - assert engine_kwargs['ctx'].service.id == service.id - assert engine_kwargs['ctx'].execution.workflow_name == 'test_workflow' - - mock_engine_execute.assert_called_once_with(ctx=compiler.workflow_ctx, - resuming=False, - retry_failed=False) - - -def test_cancel_execution(request): - mock_workflow = _setup_mock_workflow_in_service(request) - - mock_engine = mock.MagicMock() - with mock.patch('aria.orchestrator.execution.runner.engine.Engine', return_value=mock_engine): - compiler = _get_compiler(request, mock_workflow) - compiler.compile() - - runner = orch_execution.ExecutionRunner(ProcessExecutor()) - runner.cancel(ctx=compiler.workflow_ctx) - mock_engine.cancel_execution.assert_called_once_with(compiler.workflow_ctx) - - def test_execution_model_creation(request, service): mock_workflow = _setup_mock_workflow_in_service(request) - with mock.patch('aria.orchestrator.execution.runner.engine.Engine'): - execution = _get_compiler(request, mock_workflow).compile() + workflow_ctx = _get_compiler(request, mock_workflow).compile() - assert execution.service.id == service.id - assert execution.workflow_name == mock_workflow - assert execution.created_at <= datetime.utcnow() - assert execution.inputs == dict() + assert workflow_ctx.execution.service.id == service.id + assert workflow_ctx.execution.workflow_name == mock_workflow + assert workflow_ctx.execution.created_at <= datetime.utcnow() + assert workflow_ctx.execution.inputs == dict() def test_execution_inputs_override_workflow_inputs(request): @@ -207,18 +132,17 @@ def test_execution_inputs_override_workflow_inputs(request): inputs=dict((name, models.Input.wrap(name, val)) for name, val in wf_inputs.iteritems())) - with mock.patch('aria.orchestrator.execution.runner.engine.Engine'): - execution = _get_compiler(request, mock_workflow).compile( - execution_inputs={'input2': 'overriding-value2', 'input3': 7} - ) + workflow_ctx = _get_compiler(request, mock_workflow).compile( + execution_inputs={'input2': 'overriding-value2', 'input3': 7} + ) - assert len(execution.inputs) == 3 - # did not override input1 - expecting the default value from the workflow inputs - assert execution.inputs['input1'].value == 'value1' - # overrode input2 - assert execution.inputs['input2'].value == 'overriding-value2' - # overrode input of integer type - assert execution.inputs['input3'].value == 7 + assert len(workflow_ctx.execution.inputs) == 3 + # did not override input1 - expecting the default value from the workflow inputs + assert workflow_ctx.execution.inputs['input1'].value == 'value1' + # overrode input2 + assert workflow_ctx.execution.inputs['input2'].value == 'overriding-value2' + # overrode input of integer type + assert workflow_ctx.execution.inputs['input3'].value == 7 def test_execution_inputs_undeclared_inputs(request): @@ -312,7 +236,7 @@ def _get_compiler(request, workflow_name): resource = request.getfixturevalue('resource') plugin_manager = request.getfixturevalue('plugin_manager') - return orch_execution.ExecutionCompiler( + return execution_compiler.ExecutionCompiler( model, resource, plugin_manager, @@ -323,7 +247,7 @@ def _get_compiler(request, workflow_name): class TestResumableWorkflows(object): - def _create_initial_workflow_runner( + def _compile_execution( self, model, resource, @@ -340,19 +264,19 @@ class TestResumableWorkflows(object): } ) model.service.update(service) - compiler = orch_execution.ExecutionCompiler( + compiler = execution_compiler.ExecutionCompiler( model, resource, None, service, 'custom_workflow' ) - execution = compiler.compile(inputs, executor) - model.execution.update(execution) + ctx = compiler.compile(inputs, executor) + model.execution.update(ctx.execution) - return orch_execution.ExecutionRunner(executor), compiler.workflow_ctx + return ctx @staticmethod - def _wait_for_active_and_cancel(execution_runner, ctx): + def _wait_for_active_and_cancel(eng, ctx): if custom_events['is_active'].wait(60) is False: raise TimeoutError("is_active wasn't set to True") - execution_runner.cancel(ctx) + eng.cancel_execution(ctx) if custom_events['execution_cancelled'].wait(60) is False: raise TimeoutError("Execution did not end") @@ -360,7 +284,7 @@ class TestResumableWorkflows(object): node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) node.attributes['invocations'] = models.Attribute.wrap('invocations', 0) self._create_interface(workflow_context, node, mock_pass_first_task_only) - runner, ctx = self._create_initial_workflow_runner( + ctx = self._compile_execution( workflow_context.model, workflow_context.resource, workflow_context.model.service.list()[0], @@ -369,12 +293,14 @@ class TestResumableWorkflows(object): inputs={'number_of_tasks': 2} ) - wf_thread = Thread(target=runner.execute, kwargs=dict(ctx=ctx)) + eng = engine.Engine(thread_executor) + + wf_thread = Thread(target=eng.execute, kwargs=dict(ctx=ctx)) wf_thread.daemon = True wf_thread.start() # Wait for the execution to start - self._wait_for_active_and_cancel(runner, ctx) + self._wait_for_active_and_cancel(eng, ctx) node = ctx.model.node.refresh(node) tasks = ctx.model.task.list(filters={'_stub_type': None}) @@ -385,8 +311,8 @@ class TestResumableWorkflows(object): # Create a new workflow runner, with an existing execution id. This would cause # the old execution to restart. - new_wf_runner = orch_execution.ExecutionRunner(thread_executor, True) - new_wf_runner.execute(ctx) + new_engine = engine.Engine(thread_executor) + new_engine.execute(ctx, resuming=True) # Wait for it to finish and assert changes. node = workflow_context.model.node.refresh(node) @@ -399,7 +325,7 @@ class TestResumableWorkflows(object): node.attributes['invocations'] = models.Attribute.wrap('invocations', 0) self._create_interface(workflow_context, node, mock_stuck_task) - wf_runner, ctx = self._create_initial_workflow_runner( + ctx = self._compile_execution( workflow_context.model, workflow_context.resource, workflow_context.model.service.list()[0], @@ -408,11 +334,12 @@ class TestResumableWorkflows(object): inputs={'number_of_tasks': 1} ) - wf_thread = Thread(target=wf_runner.execute, kwargs=dict(ctx=ctx)) + eng = engine.Engine(thread_executor) + wf_thread = Thread(target=eng.execute, kwargs=dict(ctx=ctx)) wf_thread.daemon = True wf_thread.start() - self._wait_for_active_and_cancel(wf_runner, ctx) + self._wait_for_active_and_cancel(eng, ctx) node = workflow_context.model.node.refresh(node) task = workflow_context.model.task.list(filters={'_stub_type': None})[0] assert node.attributes['invocations'].value == 1 @@ -422,9 +349,8 @@ class TestResumableWorkflows(object): new_thread_executor = thread.ThreadExecutor() try: - new_wf_runner = orch_execution.ExecutionRunner(new_thread_executor, True) - - new_wf_runner.execute(ctx) + new_engine = engine.Engine(new_thread_executor) + new_engine.execute(ctx, resuming=True) finally: new_thread_executor.close() @@ -439,17 +365,19 @@ class TestResumableWorkflows(object): node.attributes['invocations'] = models.Attribute.wrap('invocations', 0) self._create_interface(workflow_context, node, mock_failed_before_resuming) - wf_runner, ctx = self._create_initial_workflow_runner( + ctx = self._compile_execution( workflow_context.model, workflow_context.resource, workflow_context.model.service.list()[0], mock_parallel_tasks_workflow, thread_executor) - wf_thread = Thread(target=wf_runner.execute, kwargs=dict(ctx=ctx)) + + eng = engine.Engine(thread_executor) + wf_thread = Thread(target=eng.execute, kwargs=dict(ctx=ctx)) wf_thread.setDaemon(True) wf_thread.start() - self._wait_for_active_and_cancel(wf_runner, ctx) + self._wait_for_active_and_cancel(eng, ctx) node = workflow_context.model.node.refresh(node) task = workflow_context.model.task.list(filters={'_stub_type': None})[0] @@ -464,9 +392,8 @@ class TestResumableWorkflows(object): # the old execution to restart. new_thread_executor = thread.ThreadExecutor() try: - new_wf_runner = orch_execution.ExecutionRunner(new_thread_executor, True) - - new_wf_runner.execute(ctx) + new_engine = engine.Engine(new_thread_executor) + new_engine.execute(ctx, resuming=True) finally: new_thread_executor.close() @@ -481,7 +408,7 @@ class TestResumableWorkflows(object): node.attributes['invocations'] = models.Attribute.wrap('invocations', 0) self._create_interface(workflow_context, node, mock_pass_first_task_only) - wf_runner, ctx = self._create_initial_workflow_runner( + ctx = self._compile_execution( workflow_context.model, workflow_context.resource, workflow_context.model.service.list()[0], @@ -489,7 +416,8 @@ class TestResumableWorkflows(object): thread_executor, inputs={'retry_interval': 1, 'max_attempts': 2, 'number_of_tasks': 2} ) - wf_thread = Thread(target=wf_runner.execute, kwargs=dict(ctx=ctx)) + eng = engine.Engine(thread_executor) + wf_thread = Thread(target=eng.execute, kwargs=dict(ctx=ctx)) wf_thread.setDaemon(True) wf_thread.start() @@ -511,8 +439,8 @@ class TestResumableWorkflows(object): custom_events['is_resumed'].set() new_thread_executor = thread.ThreadExecutor() try: - new_wf_runner = orch_execution.ExecutionRunner(new_thread_executor, True, True) - new_wf_runner.execute(ctx) + new_engine = engine.Engine(new_thread_executor) + new_engine.execute(ctx, resuming=True, retry_failed=True) finally: new_thread_executor.close() @@ -528,7 +456,7 @@ class TestResumableWorkflows(object): node.attributes['invocations'] = models.Attribute.wrap('invocations', 0) self._create_interface(workflow_context, node, mock_fail_first_task_only) - execution_runner, ctx = self._create_initial_workflow_runner( + ctx = self._compile_execution( workflow_context.model, workflow_context.resource, workflow_context.model.service.list()[0], @@ -536,7 +464,8 @@ class TestResumableWorkflows(object): thread_executor, inputs={'retry_interval': 1, 'max_attempts': 1, 'number_of_tasks': 2} ) - wf_thread = Thread(target=execution_runner.execute, kwargs=dict(ctx=ctx)) + eng = engine.Engine(thread_executor) + wf_thread = Thread(target=eng.execute, kwargs=dict(ctx=ctx)) wf_thread.setDaemon(True) wf_thread.start() @@ -552,8 +481,8 @@ class TestResumableWorkflows(object): custom_events['is_resumed'].set() new_thread_executor = thread.ThreadExecutor() try: - new_wf_runner = orch_execution.ExecutionRunner(new_thread_executor, resume=True) - new_wf_runner.execute(ctx) + new_engine = engine.Engine(new_thread_executor) + new_engine.execute(ctx, resuming=True) finally: new_thread_executor.close() @@ -603,7 +532,7 @@ class TestResumableWorkflows(object): graph_compiler.GraphCompiler(workflow_context, executor.__class__).compile(graph) workflow_context.execution = execution - return engine.Engine(executors={executor.__class__: executor}) + return engine.Engine(executor) @pytest.fixture(autouse=True) def register_to_events(self): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4cf88f3f/tests/orchestrator/execution_plugin/test_local.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py index 599383d..fad05de 100644 --- a/tests/orchestrator/execution_plugin/test_local.py +++ b/tests/orchestrator/execution_plugin/test_local.py @@ -502,7 +502,7 @@ if __name__ == '__main__': tasks_graph = mock_workflow(ctx=workflow_context) # pylint: disable=no-value-for-parameter graph_compiler.GraphCompiler(workflow_context, executor.__class__).compile( tasks_graph) - eng = engine.Engine({executor.__class__: executor}) + eng = engine.Engine(executor) eng.execute(workflow_context) return workflow_context.model.node.get_by_name( mock.models.DEPENDENCY_NODE_NAME).attributes http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4cf88f3f/tests/orchestrator/execution_plugin/test_ssh.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_ssh.py b/tests/orchestrator/execution_plugin/test_ssh.py index b5df939..e39f3ba 100644 --- a/tests/orchestrator/execution_plugin/test_ssh.py +++ b/tests/orchestrator/execution_plugin/test_ssh.py @@ -263,7 +263,7 @@ class TestWithActualSSHServer(object): tasks_graph = mock_workflow(ctx=self._workflow_context) # pylint: disable=no-value-for-parameter graph_compiler.GraphCompiler( self._workflow_context, self._executor.__class__).compile(tasks_graph) - eng = engine.Engine({self._executor.__class__: self._executor}) + eng = engine.Engine(self._executor) eng.execute(self._workflow_context) return self._workflow_context.model.node.get_by_name( mock.models.DEPENDENCY_NODE_NAME).attributes http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4cf88f3f/tests/orchestrator/workflows/core/test_engine.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py index 0c704f5..2c2a06a 100644 --- a/tests/orchestrator/workflows/core/test_engine.py +++ b/tests/orchestrator/workflows/core/test_engine.py @@ -52,7 +52,7 @@ class BaseTest(object): graph = workflow_func(ctx=workflow_context) graph_compiler.GraphCompiler(workflow_context, executor.__class__).compile(graph) - return engine.Engine(executors={executor.__class__: executor}) + return engine.Engine(executor) @staticmethod def _create_interface(ctx, func, arguments=None): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4cf88f3f/tests/orchestrator/workflows/core/test_events.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_events.py b/tests/orchestrator/workflows/core/test_events.py index d804de5..bb5bb75 100644 --- a/tests/orchestrator/workflows/core/test_events.py +++ b/tests/orchestrator/workflows/core/test_events.py @@ -132,7 +132,7 @@ def run_operation_on_node(ctx, op_name, interface_name, executor): single_operation_workflow(ctx, node=node, interface_name=interface_name, op_name=op_name) ) - eng = engine.Engine(executors={executor.__class__: executor}) + eng = engine.Engine(executor) eng.execute(ctx) return node http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4cf88f3f/tests/orchestrator/workflows/executor/test_process_executor_extension.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor_extension.py b/tests/orchestrator/workflows/executor/test_process_executor_extension.py index b26fa43..0093976 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor_extension.py +++ b/tests/orchestrator/workflows/executor/test_process_executor_extension.py @@ -58,7 +58,7 @@ def test_decorate_extension(context, executor): return graph graph = mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter graph_compiler.GraphCompiler(context, executor.__class__).compile(graph) - eng = engine.Engine({executor.__class__: executor}) + eng = engine.Engine(executor) eng.execute(context) out = get_node(context).attributes.get('out').value assert out['wrapper_arguments'] == arguments http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4cf88f3f/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py index 47ee2f7..8aaf4ef 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py +++ b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py @@ -108,7 +108,7 @@ def _run_workflow(context, executor, op_func, arguments=None): return graph graph = mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter graph_compiler.GraphCompiler(context, executor.__class__).compile(graph) - eng = engine.Engine({executor.__class__: executor}) + eng = engine.Engine(executor) eng.execute(context) out = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME).attributes.get('out') return out.value if out else None
