wip 2
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/da233c73 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/da233c73 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/da233c73 Branch: refs/heads/extract_execution_creation_from_workflow_runner Commit: da233c730aee305806aaa83177b6b0dbd852c264 Parents: 9611f61 Author: max-orlov <[email protected]> Authored: Sun Nov 19 15:56:32 2017 +0200 Committer: max-orlov <[email protected]> Committed: Mon Nov 20 10:14:05 2017 +0200 ---------------------------------------------------------------------- aria/cli/commands/executions.py | 44 +- aria/orchestrator/execution/__init__.py | 17 + aria/orchestrator/execution/compiler.py | 149 ++++ aria/orchestrator/execution/runner.py | 50 ++ aria/orchestrator/workflow_runner.py | 210 ------ .../workflows/core/graph_compiler.py | 12 +- test_ssh.py | 528 -------------- tests/orchestrator/context/__init__.py | 2 +- tests/orchestrator/context/test_serialize.py | 2 +- tests/orchestrator/execution/__init__.py | 14 + .../execution/test_execution_compiler.py | 698 ++++++++++++++++++ .../orchestrator/execution_plugin/test_local.py | 3 +- tests/orchestrator/execution_plugin/test_ssh.py | 2 +- tests/orchestrator/test_workflow_runner.py | 726 ------------------- .../orchestrator/workflows/core/test_engine.py | 2 +- .../orchestrator/workflows/core/test_events.py | 2 +- .../test_task_graph_into_execution_graph.py | 2 +- .../executor/test_process_executor_extension.py | 2 +- .../test_process_executor_tracked_changes.py | 2 +- 19 files changed, 962 insertions(+), 1505 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/aria/cli/commands/executions.py ---------------------------------------------------------------------- diff --git a/aria/cli/commands/executions.py b/aria/cli/commands/executions.py index 2ab3a33..162abfa 100644 --- a/aria/cli/commands/executions.py +++ b/aria/cli/commands/executions.py @@ -26,7 +26,7 @@ from .. import logger as cli_logger from .. import execution_logging from ..core import aria from ...modeling.models import Execution -from ...orchestrator import workflow_runner +from ...orchestrator import execution from ...orchestrator.workflows.executor.dry import DryExecutor from ...utils import formatting from ...utils import threading @@ -143,20 +143,19 @@ def start(workflow_name, service = model_storage.service.get_by_name(service_name) executor = DryExecutor() if dry else None # use WorkflowRunner's default executor - execution = workflow_runner.create_execution_model( - service, workflow_name, inputs - ) - model_storage.execution.put(execution) - - wf_runner = \ - workflow_runner.WorkflowRunner( - model_storage, resource_storage, plugin_manager, - execution=execution, executor=executor, - task_max_attempts=task_max_attempts, task_retry_interval=task_retry_interval - ) + new_execution = execution.ExecutionCompiler( + model_storage, + resource_storage, + plugin_manager, + service, + workflow_name + ).compile(inputs, executor=executor) + model_storage.execution.put(new_execution) + + execution_runner = execution.ExecutionRunner(executor=executor) logger.info('Starting {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else '')) - _run_execution(wf_runner, logger, model_storage, dry, mark_pattern) + _run_execution(execution_runner, logger, model_storage, dry, mark_pattern) @executions.command(name='resume', @@ -185,23 +184,18 @@ def resume(execution_id, """ executor = DryExecutor() if dry else None # use WorkflowRunner's default executor - execution = model_storage.execution.get(execution_id) - if execution.status != execution.CANCELLED: + execution_to_resume = model_storage.execution.get(execution_id) + if execution_to_resume.status != execution_to_resume.CANCELLED: logger.info("Can't resume execution {execution.id} - " "execution is in status {execution.status}. " - "Can only resume executions in status {valid_status}" - .format(execution=execution, valid_status=execution.CANCELLED)) + "Can only resume executions in status {execution_to_resume.CANCELLED}" + .format(execution=execution_to_resume)) return - - wf_runner = \ - workflow_runner.WorkflowRunner( - model_storage, resource_storage, plugin_manager, - execution=execution, executor=executor, resume=True, - retry_failed_tasks=retry_failed_tasks, - ) + + execution_runner = execution.ExecutionRunner(executor, True, retry_failed_tasks) logger.info('Resuming {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else '')) - _run_execution(wf_runner, logger, model_storage, dry, mark_pattern) + _run_execution(execution_runner, logger, model_storage, dry, mark_pattern) def _run_execution(workflow_runner, logger, model_storage, dry, mark_pattern): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/aria/orchestrator/execution/__init__.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution/__init__.py b/aria/orchestrator/execution/__init__.py new file mode 100644 index 0000000..ef17fde --- /dev/null +++ b/aria/orchestrator/execution/__init__.py @@ -0,0 +1,17 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .compiler import ExecutionCompiler +from .runner import ExecutionRunner \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/aria/orchestrator/execution/compiler.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution/compiler.py b/aria/orchestrator/execution/compiler.py new file mode 100644 index 0000000..5db52d4 --- /dev/null +++ b/aria/orchestrator/execution/compiler.py @@ -0,0 +1,149 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import sys +from datetime import datetime + +from .. import exceptions +from ..context.workflow import WorkflowContext +from ..workflows import builtin +from ..workflows.core import graph_compiler +from ..workflows.executor.process import ProcessExecutor +from ...modeling import models +from ...modeling import utils as modeling_utils +from ...utils.imports import import_fullname + + +DEFAULT_TASK_MAX_ATTEMPTS = 30 +DEFAULT_TASK_RETRY_INTERVAL = 30 + + +class ExecutionCompiler(object): + def __init__(self, model, resource, plugin, service, workflow_name): + self._model = model + self._resource = resource + self._plugin = plugin + self._service = service + self._workflow_name = workflow_name + self._workflow_context = None + + @property + def workflow_ctx(self): + return self._workflow_context + + def compile( + self, + execution_inputs=None, + executor=None, + task_max_attempts=None, + task_retry_interval=None): + + execution = self._create_execution_model(execution_inputs) + self._model.execution.put(execution) + self._set_ctx(execution, task_max_attempts, task_retry_interval) + self._create_tasks(execution, executor=executor) + self._model.execution.update(execution) + return execution + + def _set_ctx(self, execution, task_max_attempts=None, task_retry_interval=None): + self._workflow_context = WorkflowContext( + name=self.__class__.__name__, + model_storage=self._model, + resource_storage=self._resource, + service_id=execution.service.id, + execution_id=execution.id, + workflow_name=execution.workflow_name, + task_max_attempts=task_max_attempts or DEFAULT_TASK_MAX_ATTEMPTS, + task_retry_interval=task_retry_interval or DEFAULT_TASK_RETRY_INTERVAL + ) + + def _create_tasks(self, execution, executor=None): + + # Set default executor and kwargs + executor = executor or ProcessExecutor(plugin_manager=self._plugin) + + # transforming the execution inputs to dict, to pass them to the workflow function + execution_inputs_dict = dict(inp.unwrapped for inp in execution.inputs.itervalues()) + + if len(execution.tasks) == 0: + workflow_fn = self._get_workflow_fn(execution.workflow_name) + self._tasks_graph = workflow_fn(ctx=self.workflow_ctx, **execution_inputs_dict) + compiler = graph_compiler.GraphCompiler(self.workflow_ctx.execution, executor.__class__) + compiler.compile(self._tasks_graph) + + def _create_execution_model(self, inputs=None): + self._validate_workflow_exists_for_service() + self._validate_no_active_executions() + + execution = models.Execution( + created_at=datetime.utcnow(), + service_fk=self._service.id, + workflow_name=self._workflow_name, + inputs={}) + + if self._workflow_name in builtin.BUILTIN_WORKFLOWS: + workflow_inputs = dict() # built-in workflows don't have any inputs + else: + workflow_inputs = self._service.workflows[self._workflow_name].inputs + + modeling_utils.validate_no_undeclared_inputs(declared_inputs=workflow_inputs, + supplied_inputs=inputs or {}) + modeling_utils.validate_required_inputs_are_supplied(declared_inputs=workflow_inputs, + supplied_inputs=inputs or {}) + execution.inputs = modeling_utils.merge_parameter_values( + inputs, workflow_inputs, model_cls=models.Input) + + return execution + + def _validate_no_active_executions(self): + active_executions = [e for e in self._service.executions if + e.is_active()] + if active_executions: + raise exceptions.ActiveExecutionsError( + "Can't start execution; Service {0} has an active execution with ID {1}" + .format(self._service.name, active_executions[0].id)) + + def _validate_workflow_exists_for_service(self): + if self._workflow_name not in self._service.workflows and \ + self._workflow_name not in builtin.BUILTIN_WORKFLOWS: + raise exceptions.UndeclaredWorkflowError( + 'No workflow policy {0} declared in service {1}' + .format(self._workflow_name, self._service.name)) + + def _get_workflow_fn(self, workflow_name): + if workflow_name in builtin.BUILTIN_WORKFLOWS: + return import_fullname('{0}.{1}'.format(builtin.BUILTIN_WORKFLOWS_PATH_PREFIX, + workflow_name)) + + workflow = self._service.workflows[workflow_name] + + # TODO: Custom workflow support needs improvement, currently this code uses internal + # knowledge of the resource storage; Instead, workflows should probably be loaded + # in a similar manner to operation plugins. Also consider passing to import_fullname + # as paths instead of appending to sys path. + service_template_resources_path = os.path.join( + self._resource.service_template.base_path, + str(self._service.service_template.id)) + sys.path.append(service_template_resources_path) + + try: + workflow_fn = import_fullname(workflow.function) + except ImportError: + raise exceptions.WorkflowImplementationNotFoundError( + 'Could not find workflow {0} function at {1}'.format( + workflow_name, workflow.function)) + + return workflow_fn http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/aria/orchestrator/execution/runner.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution/runner.py b/aria/orchestrator/execution/runner.py new file mode 100644 index 0000000..a532901 --- /dev/null +++ b/aria/orchestrator/execution/runner.py @@ -0,0 +1,50 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Running workflows. +""" + +from ..workflows.core import engine + + +class ExecutionRunner(object): + + def __init__(self, executor, resume=False, retry_failed_tasks=False): + """ + Manages a single workflow execution on a given service. + + :param workflow_name: workflow name + :param service_id: service ID + :param inputs: key-value dict of inputs for the execution + :param model_storage: model storage API ("MAPI") + :param resource_storage: resource storage API ("RAPI") + :param plugin_manager: plugin manager + :param executor: executor for tasks; defaults to a + :class:`~aria.orchestrator.workflows.executor.process.ProcessExecutor` instance + :param task_max_attempts: maximum attempts of repeating each failing task + :param task_retry_interval: retry interval between retry attempts of a failing task + """ + + self._is_resume = resume + self._retry_failed_tasks = retry_failed_tasks + self._engine = engine.Engine(executors={executor.__class__: executor}) + + def execute(self, ctx): + self._engine.execute( + ctx=ctx, resuming=self._is_resume, retry_failed=self._retry_failed_tasks) + + def cancel(self, ctx): + self._engine.cancel_execution(ctx) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/aria/orchestrator/workflow_runner.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py deleted file mode 100644 index aebde8e..0000000 --- a/aria/orchestrator/workflow_runner.py +++ /dev/null @@ -1,210 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Running workflows. -""" - -import os -import sys -from datetime import datetime - -from . import exceptions -from .context.workflow import WorkflowContext -from .workflows import builtin -from .workflows.core import engine, graph_compiler -from .workflows.executor.process import ProcessExecutor -from ..modeling import models -from ..modeling import utils as modeling_utils -from ..utils.imports import import_fullname - - -DEFAULT_TASK_MAX_ATTEMPTS = 30 -DEFAULT_TASK_RETRY_INTERVAL = 30 - - -class WorkflowRunner(object): - - def __init__( - self, - model_storage, - resource_storage, - plugin_manager, - execution, - executor=None, - resume=False, - retry_failed_tasks=False, - task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS, - task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL - ): - """ - Manages a single workflow execution on a given service. - - :param workflow_name: workflow name - :param service_id: service ID - :param inputs: key-value dict of inputs for the execution - :param model_storage: model storage API ("MAPI") - :param resource_storage: resource storage API ("RAPI") - :param plugin_manager: plugin manager - :param executor: executor for tasks; defaults to a - :class:`~aria.orchestrator.workflows.executor.process.ProcessExecutor` instance - :param task_max_attempts: maximum attempts of repeating each failing task - :param task_retry_interval: retry interval between retry attempts of a failing task - """ - - self._is_resume = resume - self._retry_failed_tasks = retry_failed_tasks - - self._model_storage = model_storage - self._resource_storage = resource_storage - self._execution = execution - - # the IDs are stored rather than the models themselves, so this module could be used - # by several threads without raising errors on model objects shared between threads - - self._workflow_context = WorkflowContext( - name=self.__class__.__name__, - model_storage=self._model_storage, - resource_storage=resource_storage, - service_id=self.execution.service.id, - execution_id=self.execution.id, - workflow_name=self.execution.workflow_name, - task_max_attempts=task_max_attempts, - task_retry_interval=task_retry_interval) - - # Set default executor and kwargs - executor = executor or ProcessExecutor(plugin_manager=plugin_manager) - - # transforming the execution inputs to dict, to pass them to the workflow function - execution_inputs_dict = dict(inp.unwrapped for inp in self.execution.inputs.itervalues()) - - if not self._is_resume: - workflow_fn = self._get_workflow_fn() - self._tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict) - compiler = graph_compiler.GraphCompiler(self._workflow_context, executor.__class__) - compiler.compile(self._tasks_graph) - - self._engine = engine.Engine(executors={executor.__class__: executor}) - - @property - def execution_id(self): - return self.execution.id - - @property - def execution(self): - return self._model_storage.execution.get(self.execution_id) - - @property - def service(self): - return self._model_storage.service.get(self._execution.service.id) - - def execute(self): - self._engine.execute(ctx=self._workflow_context, - resuming=self._is_resume, - retry_failed=self._retry_failed_tasks) - - def cancel(self): - self._engine.cancel_execution(ctx=self._workflow_context) - - def _get_workflow_fn(self): - if self._workflow_name in builtin.BUILTIN_WORKFLOWS: - return import_fullname('{0}.{1}'.format(builtin.BUILTIN_WORKFLOWS_PATH_PREFIX, - self.execution.workflow_name)) - - workflow = self.service.workflows[self.execution.workflow_name] - - # TODO: Custom workflow support needs improvement, currently this code uses internal - # knowledge of the resource storage; Instead, workflows should probably be loaded - # in a similar manner to operation plugins. Also consider passing to import_fullname - # as paths instead of appending to sys path. - service_template_resources_path = os.path.join( - self._resource_storage.service_template.base_path, - str(self.service.service_template.id)) - sys.path.append(service_template_resources_path) - - try: - workflow_fn = import_fullname(workflow.function) - except ImportError: - raise exceptions.WorkflowImplementationNotFoundError( - 'Could not find workflow {0} function at {1}'.format( - self._workflow_name, workflow.function)) - - return workflow_fn - - -def create_execution_model(service, workflow_name, inputs): - _validate_workflow_exists_for_service(service, workflow_name) - _validate_no_active_executions(service) - execution = models.Execution( - created_at=datetime.utcnow(), - service_fk=service.id, - workflow_name=workflow_name, - inputs={}) - - if workflow_name in builtin.BUILTIN_WORKFLOWS: - workflow_inputs = dict() # built-in workflows don't have any inputs - else: - workflow_inputs = service.workflows[workflow_name].inputs - - modeling_utils.validate_no_undeclared_inputs(declared_inputs=workflow_inputs, - supplied_inputs=inputs or {}) - modeling_utils.validate_required_inputs_are_supplied(declared_inputs=workflow_inputs, - supplied_inputs=inputs or {}) - execution.inputs = modeling_utils.merge_parameter_values( - inputs, workflow_inputs, model_cls=models.Input) - - return execution - - - -def _create_execution_model(self, inputs): - execution = models.Execution( - created_at=datetime.utcnow(), - service=self.service, - workflow_name=self._workflow_name, - inputs={}) - - if self._workflow_name in builtin.BUILTIN_WORKFLOWS: - workflow_inputs = dict() # built-in workflows don't have any inputs - else: - workflow_inputs = self.service.workflows[self._workflow_name].inputs - - modeling_utils.validate_no_undeclared_inputs(declared_inputs=workflow_inputs, - supplied_inputs=inputs or {}) - modeling_utils.validate_required_inputs_are_supplied(declared_inputs=workflow_inputs, - supplied_inputs=inputs or {}) - execution.inputs = modeling_utils.merge_parameter_values( - inputs, workflow_inputs, model_cls=models.Input) - # TODO: these two following calls should execute atomically - self._validate_no_active_executions(execution) - self._model_storage.execution.put(execution) - return execution - - - -def _validate_no_active_executions(service): - active_executions = [e for e in service.executions if e.is_active()] - if active_executions: - raise exceptions.ActiveExecutionsError( - "Can't start execution; Service {0} has an active execution with ID {1}" - .format(service.name, active_executions[0].id)) - - -def _validate_workflow_exists_for_service(service, workflow_name): - if workflow_name not in service.workflows and \ - workflow_name not in builtin.BUILTIN_WORKFLOWS: - raise exceptions.UndeclaredWorkflowError( - 'No workflow policy {0} declared in service {1}' - .format(workflow_name, service.name)) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/aria/orchestrator/workflows/core/graph_compiler.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/graph_compiler.py b/aria/orchestrator/workflows/core/graph_compiler.py index 81543d5..aeb05bb 100644 --- a/aria/orchestrator/workflows/core/graph_compiler.py +++ b/aria/orchestrator/workflows/core/graph_compiler.py @@ -19,8 +19,8 @@ from .. import executor, api class GraphCompiler(object): - def __init__(self, ctx, default_executor): - self._ctx = ctx + def __init__(self, execution, default_executor): + self._execution = execution self._default_executor = default_executor self._stub_executor = executor.base.StubTaskExecutor self._model_to_api_id = {} @@ -65,7 +65,7 @@ class GraphCompiler(object): # Insert end marker self._create_stub_task( end_stub_type, - self._get_non_dependent_tasks(self._ctx.execution) or [start_task], + self._get_non_dependent_tasks(self._execution) or [start_task], self._end_graph_suffix(task_graph.id), task_graph.name ) @@ -74,17 +74,15 @@ class GraphCompiler(object): model_task = models.Task( name=name, dependencies=dependencies, - execution=self._ctx.execution, + execution=self._execution, _executor=self._stub_executor, _stub_type=stub_type) - self._ctx.model.task.put(model_task) self._model_to_api_id[model_task.id] = api_id return model_task def _create_operation_task(self, api_task, dependencies): model_task = models.Task.from_api_task( api_task, self._default_executor, dependencies=dependencies) - self._ctx.model.task.put(model_task) self._model_to_api_id[model_task.id] = api_task.id return model_task @@ -113,6 +111,6 @@ class GraphCompiler(object): dependency_name = dependency.id else: dependency_name = self._end_graph_suffix(dependency.id) - tasks.extend(task for task in self._ctx.execution.tasks + tasks.extend(task for task in self._execution.tasks if self._model_to_api_id.get(task.id, None) == dependency_name) return tasks http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/test_ssh.py ---------------------------------------------------------------------- diff --git a/test_ssh.py b/test_ssh.py deleted file mode 100644 index 5256cf8..0000000 --- a/test_ssh.py +++ /dev/null @@ -1,528 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import contextlib -import json -import logging -import os - -import pytest - -import fabric.api -from fabric.contrib import files -from fabric import context_managers - -from aria.modeling import models -from aria.orchestrator import events -from aria.orchestrator import workflow -from aria.orchestrator.workflows import api -from aria.orchestrator.workflows.executor import process -from aria.orchestrator.workflows.core import engine, graph_compiler -from aria.orchestrator.workflows.exceptions import ExecutorException -from aria.orchestrator.exceptions import TaskAbortException, TaskRetryException -from aria.orchestrator.execution_plugin import operations -from aria.orchestrator.execution_plugin import constants -from aria.orchestrator.execution_plugin.exceptions import ProcessException, TaskException -from aria.orchestrator.execution_plugin.ssh import operations as ssh_operations - -from tests import mock, storage, resources -from tests.orchestrator.workflows.helpers import events_collector - -_CUSTOM_BASE_DIR = '/tmp/new-aria-ctx' - -import tests -KEY_FILENAME = os.path.join(tests.ROOT_DIR, 'tests/resources/keys/test') - -_FABRIC_ENV = { - 'disable_known_hosts': True, - 'user': 'test', - 'key_filename': KEY_FILENAME -} - - -import mockssh [email protected](scope='session') -def server(): - with mockssh.Server({'test': KEY_FILENAME}) as s: - yield s - - -#@pytest.mark.skipif(not os.environ.get('TRAVIS'), reason='actual ssh server required') -class TestWithActualSSHServer(object): - - def test_run_script_basic(self): - expected_attribute_value = 'some_value' - props = self._execute(env={'test_value': expected_attribute_value}) - assert props['test_value'].value == expected_attribute_value - - @pytest.mark.skip(reason='sudo privileges are required') - def test_run_script_as_sudo(self): - self._execute(use_sudo=True) - with self._ssh_env(): - assert files.exists('/opt/test_dir') - fabric.api.sudo('rm -rf /opt/test_dir') - - def test_run_script_default_base_dir(self): - props = self._execute() - assert props['work_dir'].value == '{0}/work'.format(constants.DEFAULT_BASE_DIR) - - @pytest.mark.skip(reason='Re-enable once output from process executor can be captured') - @pytest.mark.parametrize('hide_groups', [[], ['everything']]) - def test_run_script_with_hide(self, hide_groups): - self._execute(hide_output=hide_groups) - output = 'TODO' - expected_log_message = ('[localhost] run: source {0}/scripts/' - .format(constants.DEFAULT_BASE_DIR)) - if hide_groups: - assert expected_log_message not in output - else: - assert expected_log_message in output - - def test_run_script_process_config(self): - expected_env_value = 'test_value_env' - expected_arg1_value = 'test_value_arg1' - expected_arg2_value = 'test_value_arg2' - expected_cwd = '/tmp' - expected_base_dir = _CUSTOM_BASE_DIR - props = self._execute( - env={'test_value_env': expected_env_value}, - process={ - 'args': [expected_arg1_value, expected_arg2_value], - 'cwd': expected_cwd, - 'base_dir': expected_base_dir - }) - assert props['env_value'].value == expected_env_value - assert len(props['bash_version'].value) > 0 - assert props['arg1_value'].value == expected_arg1_value - assert props['arg2_value'].value == expected_arg2_value - assert props['cwd'].value == expected_cwd - assert props['ctx_path'].value == '{0}/ctx'.format(expected_base_dir) - - def test_run_script_command_prefix(self): - props = self._execute(process={'command_prefix': 'bash -i'}) - assert 'i' in props['dollar_dash'].value - - def test_run_script_reuse_existing_ctx(self): - expected_test_value_1 = 'test_value_1' - expected_test_value_2 = 'test_value_2' - props = self._execute( - test_operations=['{0}_1'.format(self.test_name), - '{0}_2'.format(self.test_name)], - env={'test_value1': expected_test_value_1, - 'test_value2': expected_test_value_2}) - assert props['test_value1'].value == expected_test_value_1 - assert props['test_value2'].value == expected_test_value_2 - - def test_run_script_download_resource_plain(self, tmpdir): - resource = tmpdir.join('resource') - resource.write('content') - self._upload(str(resource), 'test_resource') - props = self._execute() - assert props['test_value'].value == 'content' - - def test_run_script_download_resource_and_render(self, tmpdir): - resource = tmpdir.join('resource') - resource.write('{{ctx.service.name}}') - self._upload(str(resource), 'test_resource') - props = self._execute() - assert props['test_value'].value == self._workflow_context.service.name - - @pytest.mark.parametrize('value', ['string-value', [1, 2, 3], {'key': 'value'}]) - def test_run_script_inputs_as_env_variables_no_override(self, value): - props = self._execute(custom_input=value) - return_value = props['test_value'].value - expected = return_value if isinstance(value, basestring) else json.loads(return_value) - assert value == expected - - @pytest.mark.parametrize('value', ['string-value', [1, 2, 3], {'key': 'value'}]) - def test_run_script_inputs_as_env_variables_process_env_override(self, value): - props = self._execute(custom_input='custom-input-value', - env={'custom_env_var': value}) - return_value = props['test_value'].value - expected = return_value if isinstance(value, basestring) else json.loads(return_value) - assert value == expected - - def test_run_script_error_in_script(self): - exception = self._execute_and_get_task_exception() - assert isinstance(exception, TaskException) - - def test_run_script_abort_immediate(self): - exception = self._execute_and_get_task_exception() - assert isinstance(exception, TaskAbortException) - assert exception.message == 'abort-message' - - def test_run_script_retry(self): - exception = self._execute_and_get_task_exception() - assert isinstance(exception, TaskRetryException) - assert exception.message == 'retry-message' - - def test_run_script_abort_error_ignored_by_script(self): - exception = self._execute_and_get_task_exception() - assert isinstance(exception, TaskAbortException) - assert exception.message == 'abort-message' - - def test_run_commands(self): - temp_file_path = '/tmp/very_temporary_file' - with self._ssh_env(): - if files.exists(temp_file_path): - fabric.api.run('rm {0}'.format(temp_file_path)) - self._execute(commands=['touch {0}'.format(temp_file_path)]) - with self._ssh_env(): - assert files.exists(temp_file_path) - fabric.api.run('rm {0}'.format(temp_file_path)) - - @pytest.fixture(autouse=True) - def _setup(self, request, workflow_context, executor, capfd, server): - print 'HI!!!!!!!!!!', server.port - self._workflow_context = workflow_context - self._executor = executor - self._capfd = capfd - self.test_name = request.node.originalname or request.node.name - with self._ssh_env(server): - for directory in [constants.DEFAULT_BASE_DIR, _CUSTOM_BASE_DIR]: - if files.exists(directory): - fabric.api.run('rm -rf {0}'.format(directory)) - - @contextlib.contextmanager - def _ssh_env(self, server): - with self._capfd.disabled(): - with context_managers.settings(fabric.api.hide('everything'), - host_string='localhost:{0}'.format(server.port), - **_FABRIC_ENV): - yield - - def _execute(self, - env=None, - use_sudo=False, - hide_output=None, - process=None, - custom_input='', - test_operations=None, - commands=None): - process = process or {} - if env: - process.setdefault('env', {}).update(env) - - test_operations = test_operations or [self.test_name] - - local_script_path = os.path.join(resources.DIR, 'scripts', 'test_ssh.sh') - script_path = os.path.basename(local_script_path) - self._upload(local_script_path, script_path) - - if commands: - operation = operations.run_commands_with_ssh - else: - operation = operations.run_script_with_ssh - - node = self._workflow_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) - arguments = { - 'script_path': script_path, - 'fabric_env': _FABRIC_ENV, - 'process': process, - 'use_sudo': use_sudo, - 'custom_env_var': custom_input, - 'test_operation': '', - } - if hide_output: - arguments['hide_output'] = hide_output - if commands: - arguments['commands'] = commands - interface = mock.models.create_interface( - node.service, - 'test', - 'op', - operation_kwargs=dict( - function='{0}.{1}'.format( - operations.__name__, - operation.__name__), - arguments=arguments) - ) - node.interfaces[interface.name] = interface - - @workflow - def mock_workflow(ctx, graph): - ops = [] - for test_operation in test_operations: - op_arguments = arguments.copy() - op_arguments['test_operation'] = test_operation - ops.append(api.task.OperationTask( - node, - interface_name='test', - operation_name='op', - arguments=op_arguments)) - - graph.sequence(*ops) - return graph - tasks_graph = mock_workflow(ctx=self._workflow_context) # pylint: disable=no-value-for-parameter - graph_compiler.GraphCompiler( - self._workflow_context, self._executor.__class__).compile(tasks_graph) - eng = engine.Engine({self._executor.__class__: self._executor}) - eng.execute(self._workflow_context) - return self._workflow_context.model.node.get_by_name( - mock.models.DEPENDENCY_NODE_NAME).attributes - - def _execute_and_get_task_exception(self, *args, **kwargs): - signal = events.on_failure_task_signal - with events_collector(signal) as collected: - with pytest.raises(ExecutorException): - self._execute(*args, **kwargs) - return collected[signal][0]['kwargs']['exception'] - - def _upload(self, source, path): - self._workflow_context.resource.service.upload( - entry_id=str(self._workflow_context.service.id), - source=source, - path=path) - - @pytest.fixture - def executor(self): - result = process.ProcessExecutor() - try: - yield result - finally: - result.close() - - @pytest.fixture - def workflow_context(self, tmpdir): - workflow_context = mock.context.simple(str(tmpdir)) - workflow_context.states = [] - workflow_context.exception = None - yield workflow_context - storage.release_sqlite_storage(workflow_context.model) - - -class TestFabricEnvHideGroupsAndRunCommands(object): - - def test_fabric_env_default_override(self): - # first sanity for no override - self._run() - assert self.mock.settings_merged['timeout'] == constants.FABRIC_ENV_DEFAULTS['timeout'] - # now override - invocation_fabric_env = self.default_fabric_env.copy() - timeout = 1000000 - invocation_fabric_env['timeout'] = timeout - self._run(fabric_env=invocation_fabric_env) - assert self.mock.settings_merged['timeout'] == timeout - - def test_implicit_host_string(self, mocker): - expected_host_address = '1.1.1.1' - mocker.patch.object(self._Ctx.task.actor, 'host') - mocker.patch.object(self._Ctx.task.actor.host, 'host_address', expected_host_address) - fabric_env = self.default_fabric_env.copy() - del fabric_env['host_string'] - self._run(fabric_env=fabric_env) - assert self.mock.settings_merged['host_string'] == expected_host_address - - def test_explicit_host_string(self): - fabric_env = self.default_fabric_env.copy() - host_string = 'explicit_host_string' - fabric_env['host_string'] = host_string - self._run(fabric_env=fabric_env) - assert self.mock.settings_merged['host_string'] == host_string - - def test_override_warn_only(self): - fabric_env = self.default_fabric_env.copy() - self._run(fabric_env=fabric_env) - assert self.mock.settings_merged['warn_only'] is True - fabric_env = self.default_fabric_env.copy() - fabric_env['warn_only'] = False - self._run(fabric_env=fabric_env) - assert self.mock.settings_merged['warn_only'] is False - - def test_missing_host_string(self): - with pytest.raises(TaskAbortException) as exc_ctx: - fabric_env = self.default_fabric_env.copy() - del fabric_env['host_string'] - self._run(fabric_env=fabric_env) - assert '`host_string` not supplied' in str(exc_ctx.value) - - def test_missing_user(self): - with pytest.raises(TaskAbortException) as exc_ctx: - fabric_env = self.default_fabric_env.copy() - del fabric_env['user'] - self._run(fabric_env=fabric_env) - assert '`user` not supplied' in str(exc_ctx.value) - - def test_missing_key_or_password(self): - with pytest.raises(TaskAbortException) as exc_ctx: - fabric_env = self.default_fabric_env.copy() - del fabric_env['key_filename'] - self._run(fabric_env=fabric_env) - assert 'Access credentials not supplied' in str(exc_ctx.value) - - def test_hide_in_settings_and_non_viable_groups(self): - groups = ('running', 'stdout') - self._run(hide_output=groups) - assert set(self.mock.settings_merged['hide_output']) == set(groups) - with pytest.raises(TaskAbortException) as exc_ctx: - self._run(hide_output=('running', 'bla')) - assert '`hide_output` must be a subset of' in str(exc_ctx.value) - - def test_run_commands(self): - def test(use_sudo): - commands = ['command1', 'command2'] - self._run( - commands=commands, - use_sudo=use_sudo) - assert all(item in self.mock.settings_merged.items() for - item in self.default_fabric_env.items()) - assert self.mock.settings_merged['warn_only'] is True - assert self.mock.settings_merged['use_sudo'] == use_sudo - assert self.mock.commands == commands - self.mock.settings_merged = {} - self.mock.commands = [] - test(use_sudo=False) - test(use_sudo=True) - - def test_failed_command(self): - with pytest.raises(ProcessException) as exc_ctx: - self._run(commands=['fail']) - exception = exc_ctx.value - assert exception.stdout == self.MockCommandResult.stdout - assert exception.stderr == self.MockCommandResult.stderr - assert exception.command == self.MockCommandResult.command - assert exception.exit_code == self.MockCommandResult.return_code - - class MockCommandResult(object): - stdout = 'mock_stdout' - stderr = 'mock_stderr' - command = 'mock_command' - return_code = 1 - - def __init__(self, failed): - self.failed = failed - - class MockFabricApi(object): - - def __init__(self): - self.commands = [] - self.settings_merged = {} - - @contextlib.contextmanager - def settings(self, *args, **kwargs): - self.settings_merged.update(kwargs) - if args: - groups = args[0] - self.settings_merged.update({'hide_output': groups}) - yield - - def run(self, command): - self.commands.append(command) - self.settings_merged['use_sudo'] = False - return TestFabricEnvHideGroupsAndRunCommands.MockCommandResult(command == 'fail') - - def sudo(self, command): - self.commands.append(command) - self.settings_merged['use_sudo'] = True - return TestFabricEnvHideGroupsAndRunCommands.MockCommandResult(command == 'fail') - - def hide(self, *groups): - return groups - - def exists(self, *args, **kwargs): - raise RuntimeError - - class _Ctx(object): - INSTRUMENTATION_FIELDS = () - - class Task(object): - @staticmethod - def abort(message=None): - models.Task.abort(message) - actor = None - - class Actor(object): - host = None - - class Model(object): - @contextlib.contextmanager - def instrument(self, *args, **kwargs): - yield - task = Task - task.actor = Actor - model = Model() - logger = logging.getLogger() - - @staticmethod - @contextlib.contextmanager - def _mock_self_logging(*args, **kwargs): - yield - _Ctx.logging_handlers = _mock_self_logging - - @pytest.fixture(autouse=True) - def _setup(self, mocker): - self.default_fabric_env = { - 'host_string': 'test', - 'user': 'test', - 'key_filename': 'test', - } - self.mock = self.MockFabricApi() - mocker.patch('fabric.api', self.mock) - - def _run(self, - commands=(), - fabric_env=None, - process=None, - use_sudo=False, - hide_output=None): - operations.run_commands_with_ssh( - ctx=self._Ctx, - commands=commands, - process=process, - fabric_env=fabric_env or self.default_fabric_env, - use_sudo=use_sudo, - hide_output=hide_output) - - -class TestUtilityFunctions(object): - - def test_paths(self): - base_dir = '/path' - local_script_path = '/local/script/path.py' - paths = ssh_operations._Paths(base_dir=base_dir, - local_script_path=local_script_path) - assert paths.local_script_path == local_script_path - assert paths.remote_ctx_dir == base_dir - assert paths.base_script_path == 'path.py' - assert paths.remote_ctx_path == '/path/ctx' - assert paths.remote_scripts_dir == '/path/scripts' - assert paths.remote_work_dir == '/path/work' - assert paths.remote_env_script_path.startswith('/path/scripts/env-path.py-') - assert paths.remote_script_path.startswith('/path/scripts/path.py-') - - def test_write_environment_script_file(self): - base_dir = '/path' - local_script_path = '/local/script/path.py' - paths = ssh_operations._Paths(base_dir=base_dir, - local_script_path=local_script_path) - env = {'one': "'1'"} - local_socket_url = 'local_socket_url' - remote_socket_url = 'remote_socket_url' - env_script_lines = set([l for l in ssh_operations._write_environment_script_file( - process={'env': env}, - paths=paths, - local_socket_url=local_socket_url, - remote_socket_url=remote_socket_url - ).getvalue().split('\n') if l]) - expected_env_script_lines = set([ - 'export PATH=/path:$PATH', - 'export PYTHONPATH=/path:$PYTHONPATH', - 'chmod +x /path/ctx', - 'chmod +x {0}'.format(paths.remote_script_path), - 'export CTX_SOCKET_URL={0}'.format(remote_socket_url), - 'export LOCAL_CTX_SOCKET_URL={0}'.format(local_socket_url), - 'export one=\'1\'' - ]) - assert env_script_lines == expected_env_script_lines http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/tests/orchestrator/context/__init__.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/__init__.py b/tests/orchestrator/context/__init__.py index 780db07..d0b85d3 100644 --- a/tests/orchestrator/context/__init__.py +++ b/tests/orchestrator/context/__init__.py @@ -26,7 +26,7 @@ def op_path(func, module_path=None): def execute(workflow_func, workflow_context, executor): graph = workflow_func(ctx=workflow_context) - graph_compiler.GraphCompiler(workflow_context, executor.__class__).compile(graph) + graph_compiler.GraphCompiler(workflow_context.execution, executor.__class__).compile(graph) eng = engine.Engine(executors={executor.__class__: executor}) eng.execute(workflow_context) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/tests/orchestrator/context/test_serialize.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_serialize.py b/tests/orchestrator/context/test_serialize.py index 091e23c..8e08e72 100644 --- a/tests/orchestrator/context/test_serialize.py +++ b/tests/orchestrator/context/test_serialize.py @@ -48,7 +48,7 @@ def test_serialize_operation_context(context, executor, tmpdir): context.model.node.update(node) graph = _mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter - graph_compiler.GraphCompiler(context, executor.__class__).compile(graph) + graph_compiler.GraphCompiler(context.execution, executor.__class__).compile(graph) eng = engine.Engine({executor.__class__: executor}) eng.execute(context) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/tests/orchestrator/execution/__init__.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution/__init__.py b/tests/orchestrator/execution/__init__.py new file mode 100644 index 0000000..ae1e83e --- /dev/null +++ b/tests/orchestrator/execution/__init__.py @@ -0,0 +1,14 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/tests/orchestrator/execution/test_execution_compiler.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution/test_execution_compiler.py b/tests/orchestrator/execution/test_execution_compiler.py new file mode 100644 index 0000000..b044872 --- /dev/null +++ b/tests/orchestrator/execution/test_execution_compiler.py @@ -0,0 +1,698 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import time +from threading import Thread, Event +from datetime import datetime + +import mock +import pytest + +from aria.modeling import exceptions as modeling_exceptions +from aria.modeling import models +from aria.orchestrator import exceptions +from aria.orchestrator import events +from aria.orchestrator import execution as orch_execution +from aria.orchestrator.workflows.executor.process import ProcessExecutor +from aria.orchestrator.workflows import api +from aria.orchestrator.workflows.core import engine, graph_compiler +from aria.orchestrator.workflows.executor import thread +from aria.orchestrator import ( + workflow, + operation, +) + +from tests import ( + mock as tests_mock, + storage +) + +from ...fixtures import ( # pylint: disable=unused-import + plugins_dir, + plugin_manager, + fs_model as model, + resource_storage as resource +) + +custom_events = { + 'is_resumed': Event(), + 'is_active': Event(), + 'execution_cancelled': Event(), + 'execution_failed': Event(), +} + + +class TimeoutError(BaseException): + pass + + +class FailingTask(BaseException): + pass + + +def test_undeclared_workflow(request): + # validating a proper error is raised when the workflow is not declared in the service + with pytest.raises(exceptions.UndeclaredWorkflowError): + _get_compiler(request, 'undeclared_workflow').compile() + + +def test_missing_workflow_implementation(service, request): + # validating a proper error is raised when the workflow code path does not exist + workflow = models.Operation( + name='test_workflow', + service=service, + function='nonexistent.workflow.implementation') + service.workflows['test_workflow'] = workflow + + with pytest.raises(exceptions.WorkflowImplementationNotFoundError): + _get_compiler(request, 'test_workflow').compile() + + +def test_builtin_workflow_instantiation(request, model): + # validates the workflow runner instantiates properly when provided with a builtin workflow + # (expecting no errors to be raised on undeclared workflow or missing workflow implementation) + execution = _get_compiler(request, 'install').compile() + assert len(execution.tasks) == 18 # expecting 18 tasks for 2 node topology + + +def test_custom_workflow_instantiation(request): + # validates the workflow runner instantiates properly when provided with a custom workflow + # (expecting no errors to be raised on undeclared workflow or missing workflow implementation) + mock_workflow = _setup_mock_workflow_in_service(request) + execution = _get_compiler(request, mock_workflow).compile() + assert len(execution.tasks) == 2 # mock workflow creates only start workflow and end workflow task + + +def test_existing_active_executions(request, service, model): + existing_active_execution = models.Execution( + service=service, + status=models.Execution.STARTED, + workflow_name='uninstall') + model.execution.put(existing_active_execution) + with pytest.raises(exceptions.ActiveExecutionsError): + _get_compiler(request, 'install').compile() + + +def test_existing_executions_but_no_active_ones(request, service, model): + existing_terminated_execution = models.Execution( + service=service, + status=models.Execution.SUCCEEDED, + workflow_name='uninstall') + model.execution.put(existing_terminated_execution) + # no active executions exist, so no error should be raised + _get_compiler(request, 'install').compile() + + +def test_default_executor(request): + # validates the ProcessExecutor is used by the workflow runner by default + mock_workflow = _setup_mock_workflow_in_service(request) + + with mock.patch('aria.orchestrator.execution.runner.engine.Engine') as mock_engine_cls: + execution = _get_compiler(request, mock_workflow).compile() + orch_execution.ExecutionRunner(ProcessExecutor()) + _, engine_kwargs = mock_engine_cls.call_args + assert isinstance(engine_kwargs.get('executors').values()[0], ProcessExecutor) + + +def test_custom_executor(request): + mock_workflow = _setup_mock_workflow_in_service(request) + + custom_executor = mock.MagicMock() + with mock.patch('aria.orchestrator.execution.runner.engine.Engine') as mock_engine_cls: + execution = _get_compiler(request, mock_workflow).compile(executor=custom_executor) + orch_execution.ExecutionRunner(custom_executor) + _, engine_kwargs = mock_engine_cls.call_args + assert engine_kwargs.get('executors').values()[0] == custom_executor + + +def test_task_configuration_parameters(request): + mock_workflow = _setup_mock_workflow_in_service(request) + + task_max_attempts = 5 + task_retry_interval = 7 + with mock.patch('aria.orchestrator.execution.runner.engine.Engine.execute') as \ + mock_engine_execute: + compiler = _get_compiler(request, mock_workflow) + execution = compiler.compile(task_max_attempts=task_max_attempts, + task_retry_interval=task_retry_interval) + orch_execution.ExecutionRunner(ProcessExecutor()).execute(compiler.workflow_ctx) + _, engine_kwargs = mock_engine_execute.call_args + assert engine_kwargs['ctx']._task_max_attempts == task_max_attempts + assert engine_kwargs['ctx']._task_retry_interval == task_retry_interval + + +def test_execute(request, service): + mock_workflow = _setup_mock_workflow_in_service(request) + + mock_engine = mock.MagicMock() + with mock.patch('aria.orchestrator.execution.runner.engine.Engine.execute', + return_value=mock_engine) as mock_engine_execute: + compiler = _get_compiler(request, mock_workflow).compile() + compiler.compile() + + runner = orch_execution.runner.ExecutionRunner(ProcessExecutor()) + runner.execute(compiler.workflow_ctx) + + _, engine_kwargs = mock_engine_execute.call_args + assert engine_kwargs['ctx'].service.id == service.id + assert engine_kwargs['ctx'].execution.workflow_name == 'test_workflow' + + mock_engine_execute.assert_called_once_with(ctx=compiler.workflow_ctx, + resuming=False, + retry_failed=False) + + +def test_cancel_execution(request): + mock_workflow = _setup_mock_workflow_in_service(request) + + mock_engine = mock.MagicMock() + with mock.patch('aria.orchestrator.execution.runner.engine.Engine', return_value=mock_engine): + compiler = _get_compiler(request, mock_workflow) + execution = compiler.compile() + + runner = orch_execution.ExecutionRunner(ProcessExecutor()) + runner.cancel(ctx=compiler.workflow_ctx) + mock_engine.cancel_execution.assert_called_once_with(compiler.workflow_ctx) + + +def test_execution_model_creation(request, service): + mock_workflow = _setup_mock_workflow_in_service(request) + + with mock.patch('aria.orchestrator.execution.runner.engine.Engine'): + execution = _get_compiler(request, mock_workflow).compile() + + assert execution.service.id == service.id + assert execution.workflow_name == mock_workflow + assert execution.created_at <= datetime.utcnow() + assert execution.inputs == dict() + + +def test_execution_inputs_override_workflow_inputs(request): + wf_inputs = {'input1': 'value1', 'input2': 'value2', 'input3': 5} + mock_workflow = _setup_mock_workflow_in_service( + request, + inputs=dict((name, models.Input.wrap(name, val)) for name, val + in wf_inputs.iteritems())) + + with mock.patch('aria.orchestrator.execution.runner.engine.Engine'): + execution = _get_compiler(request, mock_workflow).compile( + execution_inputs={'input2': 'overriding-value2', 'input3': 7} + ) + + assert len(execution.inputs) == 3 + # did not override input1 - expecting the default value from the workflow inputs + assert execution.inputs['input1'].value == 'value1' + # overrode input2 + assert execution.inputs['input2'].value == 'overriding-value2' + # overrode input of integer type + assert execution.inputs['input3'].value == 7 + + +def test_execution_inputs_undeclared_inputs(request): + mock_workflow = _setup_mock_workflow_in_service(request) + + with pytest.raises(modeling_exceptions.UndeclaredInputsException): + _get_compiler(request, mock_workflow).compile( + execution_inputs={'undeclared_input': 'value'}) + + +def test_execution_inputs_missing_required_inputs(request): + mock_workflow = _setup_mock_workflow_in_service( + request, inputs={'required_input': models.Input.wrap('required_input', value=None)}) + + with pytest.raises(modeling_exceptions.MissingRequiredInputsException): + _get_compiler(request, mock_workflow).compile(execution_inputs={}) + + +def test_execution_inputs_wrong_type_inputs(request): + mock_workflow = _setup_mock_workflow_in_service( + request, inputs={'input': models.Input.wrap('input', 'value')}) + + with pytest.raises(modeling_exceptions.ParametersOfWrongTypeException): + _get_compiler(request, mock_workflow).compile(execution_inputs={'input': 5}) + + +def test_execution_inputs_builtin_workflow_with_inputs(request): + # built-in workflows don't have inputs + with pytest.raises(modeling_exceptions.UndeclaredInputsException): + _get_compiler(request, 'install').compile(execution_inputs={'undeclared_input': 'value'}) + + +def test_workflow_function_parameters(request, tmpdir): + # validating the workflow function is passed with the + # merged execution inputs, in dict form + + # the workflow function parameters will be written to this file + output_path = str(tmpdir.join('output')) + wf_inputs = {'output_path': output_path, 'input1': 'value1', 'input2': 'value2', 'input3': 5} + + mock_workflow = _setup_mock_workflow_in_service( + request, inputs=dict((name, models.Input.wrap(name, val)) for name, val + in wf_inputs.iteritems())) + + _get_compiler(request, mock_workflow).compile( + execution_inputs={'input2': 'overriding-value2', 'input3': 7}) + + with open(output_path) as f: + wf_call_kwargs = json.load(f) + assert len(wf_call_kwargs) == 3 + assert wf_call_kwargs.get('input1') == 'value1' + assert wf_call_kwargs.get('input2') == 'overriding-value2' + assert wf_call_kwargs.get('input3') == 7 + + [email protected] +def service(model): + # sets up a service in the storage + service_id = tests_mock.topology.create_simple_topology_two_nodes(model) + service = model.service.get(service_id) + return service + + +def _setup_mock_workflow_in_service(request, inputs=None): + # sets up a mock workflow as part of the service, including uploading + # the workflow code to the service's dir on the resource storage + service = request.getfixturevalue('service') + resource = request.getfixturevalue('resource') + + source = tests_mock.workflow.__file__ + resource.service_template.upload(str(service.service_template.id), source) + mock_workflow_name = 'test_workflow' + arguments = {} + if inputs: + for input in inputs.itervalues(): + arguments[input.name] = input.as_argument() + workflow = models.Operation( + name=mock_workflow_name, + service=service, + function='workflow.mock_workflow', + inputs=inputs or {}, + arguments=arguments) + service.workflows[mock_workflow_name] = workflow + return mock_workflow_name + + +def _get_compiler(request, workflow_name): + # helper method for instantiating a workflow runner + service = request.getfixturevalue('service') + model = request.getfixturevalue('model') + resource = request.getfixturevalue('resource') + plugin_manager = request.getfixturevalue('plugin_manager') + + return orch_execution.ExecutionCompiler( + model, + resource, + plugin_manager, + service, + workflow_name + ) + + +class TestResumableWorkflows(object): + + def _create_initial_workflow_runner( + self, + model, + resource, + service, + workflow, + executor, + inputs=None): + + service.workflows['custom_workflow'] = tests_mock.models.create_operation( + 'custom_workflow', + operation_kwargs={ + 'function': '{0}.{1}'.format(__name__, workflow.__name__), + 'inputs': dict((k, models.Input.wrap(k, v)) for k, v in (inputs or {}).items()) + } + ) + model.service.update(service) + compiler = orch_execution.ExecutionCompiler( + model, resource, None, service, 'custom_workflow' + ) + execution = compiler.compile(inputs, executor) + model.execution.update(execution) + + return orch_execution.ExecutionRunner(executor), compiler.workflow_ctx + + @staticmethod + def _wait_for_active_and_cancel(execution_runner, ctx): + if custom_events['is_active'].wait(60) is False: + raise TimeoutError("is_active wasn't set to True") + execution_runner.cancel(ctx) + if custom_events['execution_cancelled'].wait(60) is False: + raise TimeoutError("Execution did not end") + + def test_resume_workflow(self, workflow_context, thread_executor): + node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) + node.attributes['invocations'] = models.Attribute.wrap('invocations', 0) + self._create_interface(workflow_context, node, mock_pass_first_task_only) + runner, ctx = self._create_initial_workflow_runner( + workflow_context.model, + workflow_context.resource, + workflow_context.model.service.list()[0], + mock_parallel_tasks_workflow, + thread_executor, + inputs={'number_of_tasks': 2} + ) + + wf_thread = Thread(target=runner.execute, kwargs=dict(ctx=ctx)) + wf_thread.daemon = True + wf_thread.start() + + # Wait for the execution to start + self._wait_for_active_and_cancel(runner, ctx) + node = ctx.model.node.refresh(node) + + tasks = ctx.model.task.list(filters={'_stub_type': None}) + assert any(task.status == task.SUCCESS for task in tasks) + assert any(task.status == task.RETRYING for task in tasks) + custom_events['is_resumed'].set() + assert any(task.status == task.RETRYING for task in tasks) + + # Create a new workflow runner, with an existing execution id. This would cause + # the old execution to restart. + new_wf_runner = orch_execution.ExecutionRunner(thread_executor, True) + new_wf_runner.execute(ctx) + + # Wait for it to finish and assert changes. + node = workflow_context.model.node.refresh(node) + assert all(task.status == task.SUCCESS for task in tasks) + assert node.attributes['invocations'].value == 3 + assert ctx.execution.status == ctx.execution.SUCCEEDED + + def test_resume_started_task(self, workflow_context, thread_executor): + node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) + node.attributes['invocations'] = models.Attribute.wrap('invocations', 0) + self._create_interface(workflow_context, node, mock_stuck_task) + + wf_runner, ctx = self._create_initial_workflow_runner( + workflow_context.model, + workflow_context.resource, + workflow_context.model.service.list()[0], + mock_parallel_tasks_workflow, + thread_executor, + inputs={'number_of_tasks': 1} + ) + + wf_thread = Thread(target=wf_runner.execute, kwargs=dict(ctx=ctx)) + wf_thread.daemon = True + wf_thread.start() + + self._wait_for_active_and_cancel(wf_runner, ctx) + node = workflow_context.model.node.refresh(node) + task = workflow_context.model.task.list(filters={'_stub_type': None})[0] + assert node.attributes['invocations'].value == 1 + assert task.status == task.STARTED + assert ctx.execution.status in (ctx.execution.CANCELLED, ctx.execution.CANCELLING) + custom_events['is_resumed'].set() + + new_thread_executor = thread.ThreadExecutor() + try: + new_wf_runner = orch_execution.ExecutionRunner(new_thread_executor, True) + + new_wf_runner.execute(ctx) + finally: + new_thread_executor.close() + + # Wait for it to finish and assert changes. + node = workflow_context.model.node.refresh(node) + assert node.attributes['invocations'].value == 2 + assert task.status == task.SUCCESS + assert ctx.execution.status == ctx.execution.SUCCEEDED + + def test_resume_failed_task(self, workflow_context, thread_executor): + node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) + node.attributes['invocations'] = models.Attribute.wrap('invocations', 0) + self._create_interface(workflow_context, node, mock_failed_before_resuming) + + wf_runner, ctx = self._create_initial_workflow_runner( + workflow_context.model, + workflow_context.resource, + workflow_context.model.service.list()[0], + mock_parallel_tasks_workflow, + thread_executor) + wf_thread = Thread(target=wf_runner.execute, kwargs=dict(ctx=ctx)) + wf_thread.setDaemon(True) + wf_thread.start() + + self._wait_for_active_and_cancel(wf_runner, ctx) + node = workflow_context.model.node.refresh(node) + + task = workflow_context.model.task.list(filters={'_stub_type': None})[0] + assert node.attributes['invocations'].value == 2 + assert task.status == task.STARTED + assert ctx.execution.status in (ctx.execution.CANCELLED, ctx.execution.CANCELLING) + + custom_events['is_resumed'].set() + assert node.attributes['invocations'].value == 2 + + # Create a new workflow runner, with an existing execution id. This would cause + # the old execution to restart. + new_thread_executor = thread.ThreadExecutor() + try: + new_wf_runner = orch_execution.ExecutionRunner(new_thread_executor, True) + + new_wf_runner.execute(ctx) + finally: + new_thread_executor.close() + + # Wait for it to finish and assert changes. + node = workflow_context.model.node.refresh(node) + assert node.attributes['invocations'].value == task.max_attempts - 1 + assert task.status == task.SUCCESS + assert ctx.execution.status == ctx.execution.SUCCEEDED + + def test_resume_failed_task_and_successful_task(self, workflow_context, thread_executor): + node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) + node.attributes['invocations'] = models.Attribute.wrap('invocations', 0) + self._create_interface(workflow_context, node, mock_pass_first_task_only) + + wf_runner, ctx = self._create_initial_workflow_runner( + workflow_context.model, + workflow_context.resource, + workflow_context.model.service.list()[0], + mock_parallel_tasks_workflow, + thread_executor, + inputs={'retry_interval': 1, 'max_attempts': 2, 'number_of_tasks': 2} + ) + wf_thread = Thread(target=wf_runner.execute, kwargs=dict(ctx=ctx)) + wf_thread.setDaemon(True) + wf_thread.start() + + if custom_events['execution_failed'].wait(60) is False: + raise TimeoutError("Execution did not end") + + tasks = workflow_context.model.task.list(filters={'_stub_type': None}) + node = workflow_context.model.node.refresh(node) + assert node.attributes['invocations'].value == 3 + failed_task = [t for t in tasks if t.status == t.FAILED][0] + + # First task passes + assert any(task.status == task.FAILED for task in tasks) + assert failed_task.attempts_count == 2 + # Second task fails + assert any(task.status == task.SUCCESS for task in tasks) + assert ctx.execution.status in ctx.execution.FAILED + + custom_events['is_resumed'].set() + new_thread_executor = thread.ThreadExecutor() + try: + new_wf_runner = orch_execution.ExecutionRunner(new_thread_executor, True, True) + new_wf_runner.execute(ctx) + finally: + new_thread_executor.close() + + # Wait for it to finish and assert changes. + node = workflow_context.model.node.refresh(node) + assert failed_task.attempts_count == 1 + assert node.attributes['invocations'].value == 4 + assert all(task.status == task.SUCCESS for task in tasks) + assert ctx.execution.status == ctx.execution.SUCCEEDED + + def test_two_sequential_task_first_task_failed(self, workflow_context, thread_executor): + node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) + node.attributes['invocations'] = models.Attribute.wrap('invocations', 0) + self._create_interface(workflow_context, node, mock_fail_first_task_only) + + execution_runner, ctx = self._create_initial_workflow_runner( + workflow_context.model, + workflow_context.resource, + workflow_context.model.service.list()[0], + mock_sequential_tasks_workflow, + thread_executor, + inputs={'retry_interval': 1, 'max_attempts': 1, 'number_of_tasks': 2} + ) + wf_thread = Thread(target=execution_runner.execute, kwargs=dict(ctx=ctx)) + wf_thread.setDaemon(True) + wf_thread.start() + + if custom_events['execution_failed'].wait(60) is False: + raise TimeoutError("Execution did not end") + + tasks = workflow_context.model.task.list(filters={'_stub_type': None}) + node = workflow_context.model.node.refresh(node) + assert node.attributes['invocations'].value == 1 + assert any(t.status == t.FAILED for t in tasks) + assert any(t.status == t.PENDING for t in tasks) + + custom_events['is_resumed'].set() + new_thread_executor = thread.ThreadExecutor() + try: + new_wf_runner = orch_execution.ExecutionRunner(new_thread_executor, resume=True) + new_wf_runner.execute(ctx) + finally: + new_thread_executor.close() + + # Wait for it to finish and assert changes. + node = workflow_context.model.node.refresh(node) + assert node.attributes['invocations'].value == 2 + assert any(t.status == t.SUCCESS for t in tasks) + assert any(t.status == t.FAILED for t in tasks) + assert ctx.execution.status == ctx.execution.SUCCEEDED + + @staticmethod + @pytest.fixture + def thread_executor(): + result = thread.ThreadExecutor() + try: + yield result + finally: + result.close() + + @staticmethod + @pytest.fixture + def workflow_context(tmpdir): + workflow_context = tests_mock.context.simple(str(tmpdir)) + yield workflow_context + storage.release_sqlite_storage(workflow_context.model) + + @staticmethod + def _create_interface(ctx, node, func, arguments=None): + interface_name = 'aria.interfaces.lifecycle' + operation_kwargs = dict(function='{name}.{func.__name__}'.format( + name=__name__, func=func)) + if arguments: + # the operation has to declare the arguments before those may be passed + operation_kwargs['arguments'] = arguments + operation_name = 'create' + interface = tests_mock.models.create_interface(node.service, interface_name, operation_name, + operation_kwargs=operation_kwargs) + node.interfaces[interface.name] = interface + ctx.model.node.update(node) + + return node, interface_name, operation_name + + @staticmethod + def _engine(workflow_func, workflow_context, executor): + graph = workflow_func(ctx=workflow_context) + execution = workflow_context.execution + graph_compiler.GraphCompiler(workflow_context.execution, executor.__class__).compile(graph) + workflow_context.execution = execution + + return engine.Engine(executors={executor.__class__: executor}) + + @pytest.fixture(autouse=True) + def register_to_events(self): + def execution_cancelled(*args, **kwargs): + custom_events['execution_cancelled'].set() + + def execution_failed(*args, **kwargs): + custom_events['execution_failed'].set() + + events.on_cancelled_workflow_signal.connect(execution_cancelled) + events.on_failure_workflow_signal.connect(execution_failed) + yield + events.on_cancelled_workflow_signal.disconnect(execution_cancelled) + events.on_failure_workflow_signal.disconnect(execution_failed) + for event in custom_events.values(): + event.clear() + + +@workflow +def mock_sequential_tasks_workflow(ctx, graph, + retry_interval=1, max_attempts=10, number_of_tasks=1): + node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) + graph.sequence(*_create_tasks(node, retry_interval, max_attempts, number_of_tasks)) + + +@workflow +def mock_parallel_tasks_workflow(ctx, graph, + retry_interval=1, max_attempts=10, number_of_tasks=1): + node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) + graph.add_tasks(*_create_tasks(node, retry_interval, max_attempts, number_of_tasks)) + + +def _create_tasks(node, retry_interval, max_attempts, number_of_tasks): + return [ + api.task.OperationTask(node, + 'aria.interfaces.lifecycle', + 'create', + retry_interval=retry_interval, + max_attempts=max_attempts) + for _ in xrange(number_of_tasks) + ] + + +@operation +def mock_failed_before_resuming(ctx): + """ + The task should run atmost ctx.task.max_attempts - 1 times, and only then pass. + overall, the number of invocations should be ctx.task.max_attempts - 1 + """ + ctx.node.attributes['invocations'] += 1 + + if ctx.node.attributes['invocations'] == 2: + custom_events['is_active'].set() + # unfreeze the thread only when all of the invocations are done + while ctx.node.attributes['invocations'] < ctx.task.max_attempts - 1: + time.sleep(5) + + elif ctx.node.attributes['invocations'] == ctx.task.max_attempts - 1: + # pass only just before the end. + return + else: + # fail o.w. + raise FailingTask("stop this task") + + +@operation +def mock_stuck_task(ctx): + ctx.node.attributes['invocations'] += 1 + while not custom_events['is_resumed'].isSet(): + if not custom_events['is_active'].isSet(): + custom_events['is_active'].set() + time.sleep(5) + + +@operation +def mock_pass_first_task_only(ctx): + ctx.node.attributes['invocations'] += 1 + + if ctx.node.attributes['invocations'] != 1: + custom_events['is_active'].set() + if not custom_events['is_resumed'].isSet(): + # if resume was called, increase by one. o/w fail the execution - second task should + # fail as long it was not a part of resuming the workflow + raise FailingTask("wasn't resumed yet") + + +@operation +def mock_fail_first_task_only(ctx): + ctx.node.attributes['invocations'] += 1 + + if not custom_events['is_resumed'].isSet() and ctx.node.attributes['invocations'] == 1: + raise FailingTask("First task should fail") http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/tests/orchestrator/execution_plugin/test_local.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py index 7f33318..467ed36 100644 --- a/tests/orchestrator/execution_plugin/test_local.py +++ b/tests/orchestrator/execution_plugin/test_local.py @@ -500,7 +500,8 @@ if __name__ == '__main__': arguments=arguments)) return graph tasks_graph = mock_workflow(ctx=workflow_context) # pylint: disable=no-value-for-parameter - graph_compiler.GraphCompiler(workflow_context, executor.__class__).compile(tasks_graph) + graph_compiler.GraphCompiler(workflow_context.execution, executor.__class__).compile( + tasks_graph) eng = engine.Engine({executor.__class__: executor}) eng.execute(workflow_context) return workflow_context.model.node.get_by_name( http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/da233c73/tests/orchestrator/execution_plugin/test_ssh.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_ssh.py b/tests/orchestrator/execution_plugin/test_ssh.py index b5df939..8992a04 100644 --- a/tests/orchestrator/execution_plugin/test_ssh.py +++ b/tests/orchestrator/execution_plugin/test_ssh.py @@ -262,7 +262,7 @@ class TestWithActualSSHServer(object): return graph tasks_graph = mock_workflow(ctx=self._workflow_context) # pylint: disable=no-value-for-parameter graph_compiler.GraphCompiler( - self._workflow_context, self._executor.__class__).compile(tasks_graph) + self._workflow_context.execution, self._executor.__class__).compile(tasks_graph) eng = engine.Engine({self._executor.__class__: self._executor}) eng.execute(self._workflow_context) return self._workflow_context.model.node.get_by_name(
