Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-1-parser-test-suite 0cf1deafc -> 1dfb81c8a (forced update)


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/e71ddc9b/tests/orchestrator/test_workflow_runner.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/test_workflow_runner.py 
b/tests/orchestrator/test_workflow_runner.py
deleted file mode 100644
index 011c4cc..0000000
--- a/tests/orchestrator/test_workflow_runner.py
+++ /dev/null
@@ -1,726 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import json
-import time
-from threading import Thread, Event
-from datetime import datetime
-
-import mock
-import pytest
-
-from aria.modeling import exceptions as modeling_exceptions
-from aria.modeling import models
-from aria.orchestrator import exceptions
-from aria.orchestrator import events
-from aria.orchestrator.workflow_runner import WorkflowRunner
-from aria.orchestrator.workflows.executor.process import ProcessExecutor
-from aria.orchestrator.workflows import api
-from aria.orchestrator.workflows.core import engine, graph_compiler
-from aria.orchestrator.workflows.executor import thread
-from aria.orchestrator import (
-    workflow,
-    operation,
-)
-
-from tests import (
-    mock as tests_mock,
-    storage
-)
-
-from ..fixtures import (  # pylint: disable=unused-import
-    plugins_dir,
-    plugin_manager,
-    fs_model as model,
-    resource_storage as resource
-)
-
-custom_events = {
-    'is_resumed': Event(),
-    'is_active': Event(),
-    'execution_cancelled': Event(),
-    'execution_failed': Event(),
-}
-
-
-class TimeoutError(BaseException):
-    pass
-
-
-class FailingTask(BaseException):
-    pass
-
-
-def test_undeclared_workflow(request):
-    # validating a proper error is raised when the workflow is not declared in 
the service
-    with pytest.raises(exceptions.UndeclaredWorkflowError):
-        _create_workflow_runner(request, 'undeclared_workflow')
-
-
-def test_missing_workflow_implementation(service, request):
-    # validating a proper error is raised when the workflow code path does not 
exist
-    workflow = models.Operation(
-        name='test_workflow',
-        service=service,
-        function='nonexistent.workflow.implementation')
-    service.workflows['test_workflow'] = workflow
-
-    with pytest.raises(exceptions.WorkflowImplementationNotFoundError):
-        _create_workflow_runner(request, 'test_workflow')
-
-
-def test_builtin_workflow_instantiation(request):
-    # validates the workflow runner instantiates properly when provided with a 
builtin workflow
-    # (expecting no errors to be raised on undeclared workflow or missing 
workflow implementation)
-    workflow_runner = _create_workflow_runner(request, 'install')
-    tasks = list(workflow_runner.execution.tasks)
-    assert len(tasks) == 18  # expecting 18 tasks for 2 node topology
-
-
-def test_custom_workflow_instantiation(request):
-    # validates the workflow runner instantiates properly when provided with a 
custom workflow
-    # (expecting no errors to be raised on undeclared workflow or missing 
workflow implementation)
-    mock_workflow = _setup_mock_workflow_in_service(request)
-    workflow_runner = _create_workflow_runner(request, mock_workflow)
-    tasks = list(workflow_runner.execution.tasks)
-    assert len(tasks) == 2  # mock workflow creates only start workflow and 
end workflow task
-
-
-def test_existing_active_executions(request, service, model):
-    existing_active_execution = models.Execution(
-        service=service,
-        status=models.Execution.STARTED,
-        workflow_name='uninstall')
-    model.execution.put(existing_active_execution)
-    with pytest.raises(exceptions.ActiveExecutionsError):
-        _create_workflow_runner(request, 'install')
-
-
-def test_existing_executions_but_no_active_ones(request, service, model):
-    existing_terminated_execution = models.Execution(
-        service=service,
-        status=models.Execution.SUCCEEDED,
-        workflow_name='uninstall')
-    model.execution.put(existing_terminated_execution)
-    # no active executions exist, so no error should be raised
-    _create_workflow_runner(request, 'install')
-
-
-def test_default_executor(request):
-    # validates the ProcessExecutor is used by the workflow runner by default
-    mock_workflow = _setup_mock_workflow_in_service(request)
-
-    with mock.patch('aria.orchestrator.workflow_runner.engine.Engine') as 
mock_engine_cls:
-        _create_workflow_runner(request, mock_workflow)
-        _, engine_kwargs = mock_engine_cls.call_args
-        assert isinstance(engine_kwargs.get('executors').values()[0], 
ProcessExecutor)
-
-
-def test_custom_executor(request):
-    mock_workflow = _setup_mock_workflow_in_service(request)
-
-    custom_executor = mock.MagicMock()
-    with mock.patch('aria.orchestrator.workflow_runner.engine.Engine') as 
mock_engine_cls:
-        _create_workflow_runner(request, mock_workflow, 
executor=custom_executor)
-        _, engine_kwargs = mock_engine_cls.call_args
-        assert engine_kwargs.get('executors').values()[0] == custom_executor
-
-
-def test_task_configuration_parameters(request):
-    mock_workflow = _setup_mock_workflow_in_service(request)
-
-    task_max_attempts = 5
-    task_retry_interval = 7
-    with mock.patch('aria.orchestrator.workflow_runner.engine.Engine.execute') 
as \
-            mock_engine_execute:
-        _create_workflow_runner(request, mock_workflow, 
task_max_attempts=task_max_attempts,
-                                
task_retry_interval=task_retry_interval).execute()
-        _, engine_kwargs = mock_engine_execute.call_args
-        assert engine_kwargs['ctx']._task_max_attempts == task_max_attempts
-        assert engine_kwargs['ctx']._task_retry_interval == task_retry_interval
-
-
-def test_execute(request, service):
-    mock_workflow = _setup_mock_workflow_in_service(request)
-
-    mock_engine = mock.MagicMock()
-    with mock.patch('aria.orchestrator.workflow_runner.engine.Engine.execute',
-                    return_value=mock_engine) as mock_engine_execute:
-        workflow_runner = _create_workflow_runner(request, mock_workflow)
-        workflow_runner.execute()
-
-        _, engine_kwargs = mock_engine_execute.call_args
-        assert engine_kwargs['ctx'].service.id == service.id
-        assert engine_kwargs['ctx'].execution.workflow_name == 'test_workflow'
-
-        
mock_engine_execute.assert_called_once_with(ctx=workflow_runner._workflow_context,
-                                                    resuming=False,
-                                                    retry_failed=False)
-
-
-def test_cancel_execution(request):
-    mock_workflow = _setup_mock_workflow_in_service(request)
-
-    mock_engine = mock.MagicMock()
-    with mock.patch('aria.orchestrator.workflow_runner.engine.Engine', 
return_value=mock_engine):
-        workflow_runner = _create_workflow_runner(request, mock_workflow)
-        workflow_runner.cancel()
-        
mock_engine.cancel_execution.assert_called_once_with(ctx=workflow_runner._workflow_context)
-
-
-def test_execution_model_creation(request, service, model):
-    mock_workflow = _setup_mock_workflow_in_service(request)
-
-    with mock.patch('aria.orchestrator.workflow_runner.engine.Engine'):
-        workflow_runner = _create_workflow_runner(request, mock_workflow)
-
-        assert model.execution.get(workflow_runner.execution.id) == 
workflow_runner.execution
-        assert workflow_runner.execution.service.id == service.id
-        assert workflow_runner.execution.workflow_name == mock_workflow
-        assert workflow_runner.execution.created_at <= datetime.utcnow()
-        assert workflow_runner.execution.inputs == dict()
-
-
-def test_execution_inputs_override_workflow_inputs(request):
-    wf_inputs = {'input1': 'value1', 'input2': 'value2', 'input3': 5}
-    mock_workflow = _setup_mock_workflow_in_service(
-        request,
-        inputs=dict((name, models.Input.wrap(name, val)) for name, val
-                    in wf_inputs.iteritems()))
-
-    with mock.patch('aria.orchestrator.workflow_runner.engine.Engine'):
-        workflow_runner = _create_workflow_runner(
-            request, mock_workflow, inputs={'input2': 'overriding-value2', 
'input3': 7})
-
-        assert len(workflow_runner.execution.inputs) == 3
-        # did not override input1 - expecting the default value from the 
workflow inputs
-        assert workflow_runner.execution.inputs['input1'].value == 'value1'
-        # overrode input2
-        assert workflow_runner.execution.inputs['input2'].value == 
'overriding-value2'
-        # overrode input of integer type
-        assert workflow_runner.execution.inputs['input3'].value == 7
-
-
-def test_execution_inputs_undeclared_inputs(request):
-    mock_workflow = _setup_mock_workflow_in_service(request)
-
-    with pytest.raises(modeling_exceptions.UndeclaredInputsException):
-        _create_workflow_runner(request, mock_workflow, 
inputs={'undeclared_input': 'value'})
-
-
-def test_execution_inputs_missing_required_inputs(request):
-    mock_workflow = _setup_mock_workflow_in_service(
-        request, inputs={'required_input': models.Input.wrap('required_input', 
value=None)})
-
-    with pytest.raises(modeling_exceptions.MissingRequiredInputsException):
-        _create_workflow_runner(request, mock_workflow, inputs={})
-
-
-def test_execution_inputs_wrong_type_inputs(request):
-    mock_workflow = _setup_mock_workflow_in_service(
-        request, inputs={'input': models.Input.wrap('input', 'value')})
-
-    with pytest.raises(modeling_exceptions.ParametersOfWrongTypeException):
-        _create_workflow_runner(request, mock_workflow, inputs={'input': 5})
-
-
-def test_execution_inputs_builtin_workflow_with_inputs(request):
-    # built-in workflows don't have inputs
-    with pytest.raises(modeling_exceptions.UndeclaredInputsException):
-        _create_workflow_runner(request, 'install', 
inputs={'undeclared_input': 'value'})
-
-
-def test_workflow_function_parameters(request, tmpdir):
-    # validating the workflow function is passed with the
-    # merged execution inputs, in dict form
-
-    # the workflow function parameters will be written to this file
-    output_path = str(tmpdir.join('output'))
-    wf_inputs = {'output_path': output_path, 'input1': 'value1', 'input2': 
'value2', 'input3': 5}
-
-    mock_workflow = _setup_mock_workflow_in_service(
-        request, inputs=dict((name, models.Input.wrap(name, val)) for name, val
-                             in wf_inputs.iteritems()))
-
-    _create_workflow_runner(request, mock_workflow,
-                            inputs={'input2': 'overriding-value2', 'input3': 
7})
-
-    with open(output_path) as f:
-        wf_call_kwargs = json.load(f)
-    assert len(wf_call_kwargs) == 3
-    assert wf_call_kwargs.get('input1') == 'value1'
-    assert wf_call_kwargs.get('input2') == 'overriding-value2'
-    assert wf_call_kwargs.get('input3') == 7
-
-
-@pytest.fixture
-def service(model):
-    # sets up a service in the storage
-    service_id = tests_mock.topology.create_simple_topology_two_nodes(model)
-    service = model.service.get(service_id)
-    return service
-
-
-def _setup_mock_workflow_in_service(request, inputs=None):
-    # sets up a mock workflow as part of the service, including uploading
-    # the workflow code to the service's dir on the resource storage
-    service = request.getfixturevalue('service')
-    resource = request.getfixturevalue('resource')
-
-    source = tests_mock.workflow.__file__
-    resource.service_template.upload(str(service.service_template.id), source)
-    mock_workflow_name = 'test_workflow'
-    arguments = {}
-    if inputs:
-        for input in inputs.itervalues():
-            arguments[input.name] = input.as_argument()
-    workflow = models.Operation(
-        name=mock_workflow_name,
-        service=service,
-        function='workflow.mock_workflow',
-        inputs=inputs or {},
-        arguments=arguments)
-    service.workflows[mock_workflow_name] = workflow
-    return mock_workflow_name
-
-
-def _create_workflow_runner(request, workflow_name, inputs=None, executor=None,
-                            task_max_attempts=None, task_retry_interval=None):
-    # helper method for instantiating a workflow runner
-    service_id = request.getfixturevalue('service').id
-    model = request.getfixturevalue('model')
-    resource = request.getfixturevalue('resource')
-    plugin_manager = request.getfixturevalue('plugin_manager')
-
-    # task configuration parameters can't be set to None, therefore only
-    # passing those if they've been set by the test
-    task_configuration_kwargs = dict()
-    if task_max_attempts is not None:
-        task_configuration_kwargs['task_max_attempts'] = task_max_attempts
-    if task_retry_interval is not None:
-        task_configuration_kwargs['task_retry_interval'] = task_retry_interval
-
-    return WorkflowRunner(
-        workflow_name=workflow_name,
-        service_id=service_id,
-        inputs=inputs or {},
-        executor=executor,
-        model_storage=model,
-        resource_storage=resource,
-        plugin_manager=plugin_manager,
-        **task_configuration_kwargs)
-
-
-class TestResumableWorkflows(object):
-
-    def _create_initial_workflow_runner(
-            self, workflow_context, workflow, executor, inputs=None):
-
-        service = workflow_context.service
-        service.workflows['custom_workflow'] = 
tests_mock.models.create_operation(
-            'custom_workflow',
-            operation_kwargs={
-                'function': '{0}.{1}'.format(__name__, workflow.__name__),
-                'inputs': dict((k, models.Input.wrap(k, v)) for k, v in 
(inputs or {}).items())
-            }
-        )
-        workflow_context.model.service.update(service)
-
-        wf_runner = WorkflowRunner(
-            service_id=workflow_context.service.id,
-            inputs=inputs or {},
-            model_storage=workflow_context.model,
-            resource_storage=workflow_context.resource,
-            plugin_manager=None,
-            workflow_name='custom_workflow',
-            executor=executor)
-        return wf_runner
-
-    @staticmethod
-    def _wait_for_active_and_cancel(workflow_runner):
-        if custom_events['is_active'].wait(60) is False:
-            raise TimeoutError("is_active wasn't set to True")
-        workflow_runner.cancel()
-        if custom_events['execution_cancelled'].wait(60) is False:
-            raise TimeoutError("Execution did not end")
-
-    def test_resume_workflow(self, workflow_context, thread_executor):
-        node = 
workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
-        node.attributes['invocations'] = models.Attribute.wrap('invocations', 
0)
-        self._create_interface(workflow_context, node, 
mock_pass_first_task_only)
-
-        wf_runner = self._create_initial_workflow_runner(
-            workflow_context, mock_parallel_tasks_workflow, thread_executor,
-            inputs={'number_of_tasks': 2})
-
-        wf_thread = Thread(target=wf_runner.execute)
-        wf_thread.daemon = True
-        wf_thread.start()
-
-        # Wait for the execution to start
-        self._wait_for_active_and_cancel(wf_runner)
-        node = workflow_context.model.node.refresh(node)
-
-        tasks = workflow_context.model.task.list(filters={'_stub_type': None})
-        assert any(task.status == task.SUCCESS for task in tasks)
-        assert any(task.status == task.RETRYING for task in tasks)
-        custom_events['is_resumed'].set()
-        assert any(task.status == task.RETRYING for task in tasks)
-
-        # Create a new workflow runner, with an existing execution id. This 
would cause
-        # the old execution to restart.
-        new_wf_runner = WorkflowRunner(
-            service_id=wf_runner.service.id,
-            inputs={},
-            model_storage=workflow_context.model,
-            resource_storage=workflow_context.resource,
-            plugin_manager=None,
-            execution_id=wf_runner.execution.id,
-            executor=thread_executor)
-
-        new_wf_runner.execute()
-
-        # Wait for it to finish and assert changes.
-        node = workflow_context.model.node.refresh(node)
-        assert all(task.status == task.SUCCESS for task in tasks)
-        assert node.attributes['invocations'].value == 3
-        assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
-
-    def test_resume_started_task(self, workflow_context, thread_executor):
-        node = 
workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
-        node.attributes['invocations'] = models.Attribute.wrap('invocations', 
0)
-        self._create_interface(workflow_context, node, mock_stuck_task)
-
-        wf_runner = self._create_initial_workflow_runner(
-            workflow_context, mock_parallel_tasks_workflow, thread_executor,
-            inputs={'number_of_tasks': 1})
-
-        wf_thread = Thread(target=wf_runner.execute)
-        wf_thread.daemon = True
-        wf_thread.start()
-
-        self._wait_for_active_and_cancel(wf_runner)
-        node = workflow_context.model.node.refresh(node)
-        task = workflow_context.model.task.list(filters={'_stub_type': 
None})[0]
-        assert node.attributes['invocations'].value == 1
-        assert task.status == task.STARTED
-        assert wf_runner.execution.status in (wf_runner.execution.CANCELLED,
-                                              wf_runner.execution.CANCELLING)
-        custom_events['is_resumed'].set()
-
-        new_thread_executor = thread.ThreadExecutor()
-        try:
-            new_wf_runner = WorkflowRunner(
-                service_id=wf_runner.service.id,
-                inputs={},
-                model_storage=workflow_context.model,
-                resource_storage=workflow_context.resource,
-                plugin_manager=None,
-                execution_id=wf_runner.execution.id,
-                executor=new_thread_executor)
-
-            new_wf_runner.execute()
-        finally:
-            new_thread_executor.close()
-
-        # Wait for it to finish and assert changes.
-        node = workflow_context.model.node.refresh(node)
-        assert node.attributes['invocations'].value == 2
-        assert task.status == task.SUCCESS
-        assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
-
-    def test_resume_failed_task(self, workflow_context, thread_executor):
-        node = 
workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
-        node.attributes['invocations'] = models.Attribute.wrap('invocations', 
0)
-        self._create_interface(workflow_context, node, 
mock_failed_before_resuming)
-
-        wf_runner = self._create_initial_workflow_runner(workflow_context,
-                                                         
mock_parallel_tasks_workflow,
-                                                         thread_executor)
-        wf_thread = Thread(target=wf_runner.execute)
-        wf_thread.setDaemon(True)
-        wf_thread.start()
-
-        self._wait_for_active_and_cancel(wf_runner)
-        node = workflow_context.model.node.refresh(node)
-
-        task = workflow_context.model.task.list(filters={'_stub_type': 
None})[0]
-        assert node.attributes['invocations'].value == 2
-        assert task.status == task.STARTED
-        assert wf_runner.execution.status in (wf_runner.execution.CANCELLED,
-                                              wf_runner.execution.CANCELLING)
-
-        custom_events['is_resumed'].set()
-        assert node.attributes['invocations'].value == 2
-
-        # Create a new workflow runner, with an existing execution id. This 
would cause
-        # the old execution to restart.
-        new_thread_executor = thread.ThreadExecutor()
-        try:
-            new_wf_runner = WorkflowRunner(
-                service_id=wf_runner.service.id,
-                inputs={},
-                model_storage=workflow_context.model,
-                resource_storage=workflow_context.resource,
-                plugin_manager=None,
-                execution_id=wf_runner.execution.id,
-                executor=new_thread_executor)
-
-            new_wf_runner.execute()
-        finally:
-            new_thread_executor.close()
-
-        # Wait for it to finish and assert changes.
-        node = workflow_context.model.node.refresh(node)
-        assert node.attributes['invocations'].value == task.max_attempts - 1
-        assert task.status == task.SUCCESS
-        assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
-
-    def test_resume_failed_task_and_successful_task(self, workflow_context, 
thread_executor):
-        node = 
workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
-        node.attributes['invocations'] = models.Attribute.wrap('invocations', 
0)
-        self._create_interface(workflow_context, node, 
mock_pass_first_task_only)
-
-        wf_runner = self._create_initial_workflow_runner(
-            workflow_context,
-            mock_parallel_tasks_workflow,
-            thread_executor,
-            inputs={'retry_interval': 1, 'max_attempts': 2, 'number_of_tasks': 
2}
-        )
-        wf_thread = Thread(target=wf_runner.execute)
-        wf_thread.setDaemon(True)
-        wf_thread.start()
-
-        if custom_events['execution_failed'].wait(60) is False:
-            raise TimeoutError("Execution did not end")
-
-        tasks = workflow_context.model.task.list(filters={'_stub_type': None})
-        node = workflow_context.model.node.refresh(node)
-        assert node.attributes['invocations'].value == 3
-        failed_task = [t for t in tasks if t.status == t.FAILED][0]
-
-        # First task passes
-        assert any(task.status == task.FAILED for task in tasks)
-        assert failed_task.attempts_count == 2
-        # Second task fails
-        assert any(task.status == task.SUCCESS for task in tasks)
-        assert wf_runner.execution.status in wf_runner.execution.FAILED
-
-        custom_events['is_resumed'].set()
-        new_thread_executor = thread.ThreadExecutor()
-        try:
-            new_wf_runner = WorkflowRunner(
-                service_id=wf_runner.service.id,
-                retry_failed_tasks=True,
-                inputs={},
-                model_storage=workflow_context.model,
-                resource_storage=workflow_context.resource,
-                plugin_manager=None,
-                execution_id=wf_runner.execution.id,
-                executor=new_thread_executor)
-
-            new_wf_runner.execute()
-        finally:
-            new_thread_executor.close()
-
-        # Wait for it to finish and assert changes.
-        node = workflow_context.model.node.refresh(node)
-        assert failed_task.attempts_count == 1
-        assert node.attributes['invocations'].value == 4
-        assert all(task.status == task.SUCCESS for task in tasks)
-        assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
-
-    def test_two_sequential_task_first_task_failed(self, workflow_context, 
thread_executor):
-        node = 
workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
-        node.attributes['invocations'] = models.Attribute.wrap('invocations', 
0)
-        self._create_interface(workflow_context, node, 
mock_fail_first_task_only)
-
-        wf_runner = self._create_initial_workflow_runner(
-            workflow_context,
-            mock_sequential_tasks_workflow,
-            thread_executor,
-            inputs={'retry_interval': 1, 'max_attempts': 1, 'number_of_tasks': 
2}
-        )
-        wf_thread = Thread(target=wf_runner.execute)
-        wf_thread.setDaemon(True)
-        wf_thread.start()
-
-        if custom_events['execution_failed'].wait(60) is False:
-            raise TimeoutError("Execution did not end")
-
-        tasks = workflow_context.model.task.list(filters={'_stub_type': None})
-        node = workflow_context.model.node.refresh(node)
-        assert node.attributes['invocations'].value == 1
-        assert any(t.status == t.FAILED for t in tasks)
-        assert any(t.status == t.PENDING for t in tasks)
-
-        custom_events['is_resumed'].set()
-        new_thread_executor = thread.ThreadExecutor()
-        try:
-            new_wf_runner = WorkflowRunner(
-                service_id=wf_runner.service.id,
-                inputs={},
-                model_storage=workflow_context.model,
-                resource_storage=workflow_context.resource,
-                plugin_manager=None,
-                execution_id=wf_runner.execution.id,
-                executor=new_thread_executor)
-
-            new_wf_runner.execute()
-        finally:
-            new_thread_executor.close()
-
-        # Wait for it to finish and assert changes.
-        node = workflow_context.model.node.refresh(node)
-        assert node.attributes['invocations'].value == 2
-        assert any(t.status == t.SUCCESS for t in tasks)
-        assert any(t.status == t.FAILED for t in tasks)
-        assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
-
-
-
-    @staticmethod
-    @pytest.fixture
-    def thread_executor():
-        result = thread.ThreadExecutor()
-        try:
-            yield result
-        finally:
-            result.close()
-
-    @staticmethod
-    @pytest.fixture
-    def workflow_context(tmpdir):
-        workflow_context = tests_mock.context.simple(str(tmpdir))
-        yield workflow_context
-        storage.release_sqlite_storage(workflow_context.model)
-
-    @staticmethod
-    def _create_interface(ctx, node, func, arguments=None):
-        interface_name = 'aria.interfaces.lifecycle'
-        operation_kwargs = dict(function='{name}.{func.__name__}'.format(
-            name=__name__, func=func))
-        if arguments:
-            # the operation has to declare the arguments before those may be 
passed
-            operation_kwargs['arguments'] = arguments
-        operation_name = 'create'
-        interface = tests_mock.models.create_interface(node.service, 
interface_name, operation_name,
-                                                       
operation_kwargs=operation_kwargs)
-        node.interfaces[interface.name] = interface
-        ctx.model.node.update(node)
-
-        return node, interface_name, operation_name
-
-    @staticmethod
-    def _engine(workflow_func, workflow_context, executor):
-        graph = workflow_func(ctx=workflow_context)
-        execution = workflow_context.execution
-        graph_compiler.GraphCompiler(workflow_context, 
executor.__class__).compile(graph)
-        workflow_context.execution = execution
-
-        return engine.Engine(executors={executor.__class__: executor})
-
-    @pytest.fixture(autouse=True)
-    def register_to_events(self):
-        def execution_cancelled(*args, **kwargs):
-            custom_events['execution_cancelled'].set()
-
-        def execution_failed(*args, **kwargs):
-            custom_events['execution_failed'].set()
-
-        events.on_cancelled_workflow_signal.connect(execution_cancelled)
-        events.on_failure_workflow_signal.connect(execution_failed)
-        yield
-        events.on_cancelled_workflow_signal.disconnect(execution_cancelled)
-        events.on_failure_workflow_signal.disconnect(execution_failed)
-        for event in custom_events.values():
-            event.clear()
-
-
-@workflow
-def mock_sequential_tasks_workflow(ctx, graph,
-                                   retry_interval=1, max_attempts=10, 
number_of_tasks=1):
-    node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
-    graph.sequence(*_create_tasks(node, retry_interval, max_attempts, 
number_of_tasks))
-
-
-@workflow
-def mock_parallel_tasks_workflow(ctx, graph,
-                                 retry_interval=1, max_attempts=10, 
number_of_tasks=1):
-    node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
-    graph.add_tasks(*_create_tasks(node, retry_interval, max_attempts, 
number_of_tasks))
-
-
-def _create_tasks(node, retry_interval, max_attempts, number_of_tasks):
-    return [
-        api.task.OperationTask(node,
-                               'aria.interfaces.lifecycle',
-                               'create',
-                               retry_interval=retry_interval,
-                               max_attempts=max_attempts)
-        for _ in xrange(number_of_tasks)
-    ]
-
-
-
-@operation
-def mock_failed_before_resuming(ctx):
-    """
-    The task should run atmost ctx.task.max_attempts - 1 times, and only then 
pass.
-    overall, the number of invocations should be ctx.task.max_attempts - 1
-    """
-    ctx.node.attributes['invocations'] += 1
-
-    if ctx.node.attributes['invocations'] == 2:
-        custom_events['is_active'].set()
-        # unfreeze the thread only when all of the invocations are done
-        while ctx.node.attributes['invocations'] < ctx.task.max_attempts - 1:
-            time.sleep(5)
-
-    elif ctx.node.attributes['invocations'] == ctx.task.max_attempts - 1:
-        # pass only just before the end.
-        return
-    else:
-        # fail o.w.
-        raise FailingTask("stop this task")
-
-
-@operation
-def mock_stuck_task(ctx):
-    ctx.node.attributes['invocations'] += 1
-    while not custom_events['is_resumed'].isSet():
-        if not custom_events['is_active'].isSet():
-            custom_events['is_active'].set()
-        time.sleep(5)
-
-
-@operation
-def mock_pass_first_task_only(ctx):
-    ctx.node.attributes['invocations'] += 1
-
-    if ctx.node.attributes['invocations'] != 1:
-        custom_events['is_active'].set()
-        if not custom_events['is_resumed'].isSet():
-            # if resume was called, increase by one. o/w fail the execution - 
second task should
-            # fail as long it was not a part of resuming the workflow
-            raise FailingTask("wasn't resumed yet")
-
-
-@operation
-def mock_fail_first_task_only(ctx):
-    ctx.node.attributes['invocations'] += 1
-
-    if not custom_events['is_resumed'].isSet() and 
ctx.node.attributes['invocations'] == 1:
-        raise FailingTask("First task should fail")

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/e71ddc9b/tests/orchestrator/workflows/core/test_engine.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_engine.py 
b/tests/orchestrator/workflows/core/test_engine.py
index 0c704f5..2c2a06a 100644
--- a/tests/orchestrator/workflows/core/test_engine.py
+++ b/tests/orchestrator/workflows/core/test_engine.py
@@ -52,7 +52,7 @@ class BaseTest(object):
         graph = workflow_func(ctx=workflow_context)
         graph_compiler.GraphCompiler(workflow_context, 
executor.__class__).compile(graph)
 
-        return engine.Engine(executors={executor.__class__: executor})
+        return engine.Engine(executor)
 
     @staticmethod
     def _create_interface(ctx, func, arguments=None):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/e71ddc9b/tests/orchestrator/workflows/core/test_events.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_events.py 
b/tests/orchestrator/workflows/core/test_events.py
index d804de5..bb5bb75 100644
--- a/tests/orchestrator/workflows/core/test_events.py
+++ b/tests/orchestrator/workflows/core/test_events.py
@@ -132,7 +132,7 @@ def run_operation_on_node(ctx, op_name, interface_name, 
executor):
         single_operation_workflow(ctx, node=node, 
interface_name=interface_name, op_name=op_name)
     )
 
-    eng = engine.Engine(executors={executor.__class__: executor})
+    eng = engine.Engine(executor)
     eng.execute(ctx)
     return node
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/e71ddc9b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
----------------------------------------------------------------------
diff --git 
a/tests/orchestrator/workflows/executor/test_process_executor_extension.py 
b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
index b26fa43..0093976 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_extension.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
@@ -58,7 +58,7 @@ def test_decorate_extension(context, executor):
         return graph
     graph = mock_workflow(ctx=context)  # pylint: 
disable=no-value-for-parameter
     graph_compiler.GraphCompiler(context, executor.__class__).compile(graph)
-    eng = engine.Engine({executor.__class__: executor})
+    eng = engine.Engine(executor)
     eng.execute(context)
     out = get_node(context).attributes.get('out').value
     assert out['wrapper_arguments'] == arguments

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/e71ddc9b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
----------------------------------------------------------------------
diff --git 
a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
 
b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
index 47ee2f7..8aaf4ef 100644
--- 
a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
+++ 
b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
@@ -108,7 +108,7 @@ def _run_workflow(context, executor, op_func, 
arguments=None):
         return graph
     graph = mock_workflow(ctx=context)  # pylint: 
disable=no-value-for-parameter
     graph_compiler.GraphCompiler(context, executor.__class__).compile(graph)
-    eng = engine.Engine({executor.__class__: executor})
+    eng = engine.Engine(executor)
     eng.execute(context)
     out = 
context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME).attributes.get('out')
     return out.value if out else None

Reply via email to