Github user aviyoop commented on a diff in the pull request:
https://github.com/apache/incubator-ariatosca/pull/208#discussion_r152283978
--- Diff: tests/orchestrator/execution/test_execution_compiler.py ---
@@ -296,171 +230,161 @@ def _setup_mock_workflow_in_service(request,
inputs=None):
return mock_workflow_name
-def _create_workflow_runner(request, workflow_name, inputs=None,
executor=None,
- task_max_attempts=None,
task_retry_interval=None):
+def _get_compiler(request, workflow_name):
# helper method for instantiating a workflow runner
- service_id = request.getfixturevalue('service').id
+ service = request.getfixturevalue('service')
model = request.getfixturevalue('model')
resource = request.getfixturevalue('resource')
plugin_manager = request.getfixturevalue('plugin_manager')
- # task configuration parameters can't be set to None, therefore only
- # passing those if they've been set by the test
- task_configuration_kwargs = dict()
- if task_max_attempts is not None:
- task_configuration_kwargs['task_max_attempts'] = task_max_attempts
- if task_retry_interval is not None:
- task_configuration_kwargs['task_retry_interval'] =
task_retry_interval
-
- return WorkflowRunner(
- workflow_name=workflow_name,
- service_id=service_id,
- inputs=inputs or {},
- executor=executor,
- model_storage=model,
- resource_storage=resource,
- plugin_manager=plugin_manager,
- **task_configuration_kwargs)
+ return execution_compiler.ExecutionCompiler(
+ model,
+ resource,
+ plugin_manager,
+ service,
+ workflow_name
+ )
class TestResumableWorkflows(object):
- def _create_initial_workflow_runner(
- self, workflow_context, workflow, executor, inputs=None):
+ def _compile_execution(
+ self,
+ model,
+ resource,
+ service,
+ workflow,
+ executor,
+ inputs=None):
- service = workflow_context.service
service.workflows['custom_workflow'] =
tests_mock.models.create_operation(
'custom_workflow',
operation_kwargs={
'function': '{0}.{1}'.format(__name__, workflow.__name__),
'inputs': dict((k, models.Input.wrap(k, v)) for k, v in
(inputs or {}).items())
}
)
- workflow_context.model.service.update(service)
-
- wf_runner = WorkflowRunner(
- service_id=workflow_context.service.id,
- inputs=inputs or {},
- model_storage=workflow_context.model,
- resource_storage=workflow_context.resource,
- plugin_manager=None,
- workflow_name='custom_workflow',
- executor=executor)
- return wf_runner
+ model.service.update(service)
+ compiler = execution_compiler.ExecutionCompiler(
+ model, resource, None, service, 'custom_workflow'
+ )
+ ctx = compiler.compile(inputs, executor)
+ model.execution.update(ctx.execution)
+
+ return ctx
@staticmethod
- def _wait_for_active_and_cancel(workflow_runner):
+ def _wait_for_active_and_cancel(eng, ctx):
if custom_events['is_active'].wait(60) is False:
raise TimeoutError("is_active wasn't set to True")
- workflow_runner.cancel()
+ eng.cancel_execution(ctx)
if custom_events['execution_cancelled'].wait(60) is False:
raise TimeoutError("Execution did not end")
def test_resume_workflow(self, workflow_context, thread_executor):
node =
workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
node.attributes['invocations'] =
models.Attribute.wrap('invocations', 0)
self._create_interface(workflow_context, node,
mock_pass_first_task_only)
+ ctx = self._compile_execution(
+ workflow_context.model,
+ workflow_context.resource,
+ workflow_context.model.service.list()[0],
+ mock_parallel_tasks_workflow,
+ thread_executor,
+ inputs={'number_of_tasks': 2}
+ )
- wf_runner = self._create_initial_workflow_runner(
- workflow_context, mock_parallel_tasks_workflow,
thread_executor,
- inputs={'number_of_tasks': 2})
+ eng = engine.Engine(thread_executor)
- wf_thread = Thread(target=wf_runner.execute)
+ wf_thread = Thread(target=eng.execute, kwargs=dict(ctx=ctx))
wf_thread.daemon = True
wf_thread.start()
# Wait for the execution to start
- self._wait_for_active_and_cancel(wf_runner)
- node = workflow_context.model.node.refresh(node)
+ self._wait_for_active_and_cancel(eng, ctx)
+ node = ctx.model.node.refresh(node)
- tasks = workflow_context.model.task.list(filters={'_stub_type':
None})
+ tasks = ctx.model.task.list(filters={'_stub_type': None})
assert any(task.status == task.SUCCESS for task in tasks)
assert any(task.status == task.RETRYING for task in tasks)
custom_events['is_resumed'].set()
assert any(task.status == task.RETRYING for task in tasks)
# Create a new workflow runner, with an existing execution id.
This would cause
--- End diff --
This comment needs to be revised
---