Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-278-Remove-core-tasks ecf5fc9f0 -> 4d9112085 (forced update)
wip2 Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/4d911208 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/4d911208 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/4d911208 Branch: refs/heads/ARIA-278-Remove-core-tasks Commit: 4d911208504f41e581376d0c5f770fa59571119e Parents: 7f5c620 Author: max-orlov <ma...@gigaspaces.com> Authored: Thu Jun 15 13:49:19 2017 +0300 Committer: max-orlov <ma...@gigaspaces.com> Committed: Thu Jun 15 13:58:43 2017 +0300 ---------------------------------------------------------------------- aria/orchestrator/workflow_runner.py | 4 +- aria/orchestrator/workflows/core/engine.py | 78 ++++++++++---------- tests/orchestrator/test_workflow_runner.py | 31 ++++---- .../orchestrator/workflows/core/test_engine.py | 6 +- .../test_task_graph_into_execution_graph.py | 17 ++--- 5 files changed, 67 insertions(+), 69 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4d911208/aria/orchestrator/workflow_runner.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py index 919da58..c4d8666 100644 --- a/aria/orchestrator/workflow_runner.py +++ b/aria/orchestrator/workflow_runner.py @@ -109,10 +109,10 @@ class WorkflowRunner(object): return self._model_storage.service.get(self._service_id) def execute(self): - self._engine.execute(self._workflow_context) + self._engine.execute(ctx=self._workflow_context) def cancel(self): - self._engine.cancel_execution() + self._engine.cancel_execution(ctx=self._workflow_context) def _create_execution_model(self, inputs): execution = models.Execution( http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4d911208/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py index b9c3439..ade3661 100644 --- a/aria/orchestrator/workflows/core/engine.py +++ b/aria/orchestrator/workflows/core/engine.py @@ -38,75 +38,78 @@ class Engine(logger.LoggerMixin): def __init__(self, executor, **kwargs): super(Engine, self).__init__(**kwargs) self._executors = {executor.__class__: executor} - self._workflow_context = None def execute(self, ctx): """ execute the workflow """ - self._workflow_context = ctx - try: - events.start_workflow_signal.send(self._workflow_context) + events.start_workflow_signal.send(ctx) while True: - cancel = self._is_cancel() + cancel = self._is_cancel(ctx) if cancel: break - for task in self._ended_tasks(): - self._handle_ended_tasks(task) - for task in self._executable_tasks(): - self._handle_executable_task(task) - if self._all_tasks_consumed(): + for task in self._ended_tasks(ctx): + self._handle_ended_tasks(ctx, task) + for task in self._executable_tasks(ctx): + self._handle_executable_task(ctx, task) + if self._all_tasks_consumed(ctx): break else: time.sleep(0.1) if cancel: - events.on_cancelled_workflow_signal.send(self._workflow_context) + events.on_cancelled_workflow_signal.send(ctx) else: - events.on_success_workflow_signal.send(self._workflow_context) + events.on_success_workflow_signal.send(ctx) except BaseException as e: - events.on_failure_workflow_signal.send(self._workflow_context, exception=e) + events.on_failure_workflow_signal.send(ctx, exception=e) raise - def cancel_execution(self): + @staticmethod + def cancel_execution(ctx): """ Send a cancel request to the engine. If execution already started, execution status will be modified to 'cancelling' status. If execution is in pending mode, execution status will be modified to 'cancelled' directly. """ - events.on_cancelling_workflow_signal.send(self._workflow_context) - self._workflow_context.execution = self._workflow_context.execution + events.on_cancelling_workflow_signal.send(ctx) + ctx.execution = ctx.execution - def _is_cancel(self): - execution = self._workflow_context.model.execution.update(self._workflow_context.execution) + @staticmethod + def _is_cancel(ctx): + execution = ctx.model.execution.update(ctx.execution) return execution.status in (models.Execution.CANCELLING, models.Execution.CANCELLED) - def _executable_tasks(self): + def _executable_tasks(self, ctx): now = datetime.utcnow() return ( - task for task in self._tasks_iter() - if task.is_waiting() and task.due_at <= now and not self._task_has_dependencies(task) + task for task in self._tasks_iter(ctx) + if task.is_waiting() and task.due_at <= now and \ + not self._task_has_dependencies(ctx, task) ) - def _ended_tasks(self): - for task in self._tasks_iter(): - if task.has_ended() and task in self._workflow_context.graph: + def _ended_tasks(self, ctx): + for task in self._tasks_iter(ctx): + if task.has_ended() and task in ctx.graph: yield task - def _task_has_dependencies(self, task): + @staticmethod + def _task_has_dependencies(ctx, task): return task.dependencies and \ - all(d in self._workflow_context.graph for d in task.dependencies) + all(d in ctx.graph for d in task.dependencies) - def _all_tasks_consumed(self): - return len(self._workflow_context.graph.node) == 0 + @staticmethod + def _all_tasks_consumed(ctx): + return len(ctx.graph.node) == 0 - def _tasks_iter(self): - for task in self._workflow_context.execution.tasks: + @staticmethod + def _tasks_iter(ctx): + for task in ctx.execution.tasks: if not task.has_ended(): - task = self._workflow_context.model.task.refresh(task) + task = ctx.model.task.refresh(task) yield task - def _handle_executable_task(self, task): + def _handle_executable_task(self, ctx, task): if not task.stub_type: events.sent_task_signal.send(task) @@ -116,9 +119,9 @@ class Engine(logger.LoggerMixin): context_cls = task._context_cls or operation.BaseOperationContext op_ctx = context_cls( - model_storage=self._workflow_context.model, - resource_storage=self._workflow_context.resource, - workdir=self._workflow_context._workdir, + model_storage=ctx.model, + resource_storage=ctx.resource, + workdir=ctx._workdir, task_id=task.id, actor_id=task.actor.id if task.actor else None, service_id=task.execution.service.id, @@ -128,8 +131,9 @@ class Engine(logger.LoggerMixin): executor.execute(op_ctx) - def _handle_ended_tasks(self, task): + @staticmethod + def _handle_ended_tasks(ctx, task): if task.status == models.Task.FAILED and not task.ignore_failure: raise exceptions.ExecutorException('Workflow failed') else: - self._workflow_context.graph.remove_node(task) + ctx.graph.remove_node(task) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4d911208/tests/orchestrator/test_workflow_runner.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py index cd50580..bc4bab0 100644 --- a/tests/orchestrator/test_workflow_runner.py +++ b/tests/orchestrator/test_workflow_runner.py @@ -117,30 +117,28 @@ def test_task_configuration_parameters(request): task_max_attempts = 5 task_retry_interval = 7 - with mock.patch('aria.orchestrator.workflow_runner.Engine') as mock_engine_cls: + with mock.patch('aria.orchestrator.workflow_runner.Engine.execute') as mock_engine_execute: _create_workflow_runner(request, mock_workflow, task_max_attempts=task_max_attempts, - task_retry_interval=task_retry_interval) - _, engine_kwargs = mock_engine_cls.call_args - # TODO: fix - # assert engine_kwargs['workflow_context']._task_max_attempts == task_max_attempts - # assert engine_kwargs['workflow_context']._task_retry_interval == task_retry_interval + task_retry_interval=task_retry_interval).execute() + _, engine_kwargs = mock_engine_execute.call_args + assert engine_kwargs['ctx']._task_max_attempts == task_max_attempts + assert engine_kwargs['ctx']._task_retry_interval == task_retry_interval def test_execute(request, service): mock_workflow = _setup_mock_workflow_in_service(request) mock_engine = mock.MagicMock() - with mock.patch('aria.orchestrator.workflow_runner.Engine', return_value=mock_engine) \ - as mock_engine_cls: + with mock.patch('aria.orchestrator.workflow_runner.Engine.execute', return_value=mock_engine) \ + as mock_engine_execute: workflow_runner = _create_workflow_runner(request, mock_workflow) + workflow_runner.execute() - _, engine_kwargs = mock_engine_cls.call_args - # TODO: fix - # assert engine_kwargs['workflow_context'].service.id == service.id - # assert engine_kwargs['workflow_context'].execution.workflow_name == 'test_workflow' + _, engine_kwargs = mock_engine_execute.call_args + assert engine_kwargs['ctx'].service.id == service.id + assert engine_kwargs['ctx'].execution.workflow_name == 'test_workflow' - workflow_runner.execute() - mock_engine.execute.assert_called_once_with() + mock_engine_execute.assert_called_once_with(ctx=workflow_runner._workflow_context) def test_cancel_execution(request): @@ -156,12 +154,9 @@ def test_cancel_execution(request): def test_execution_model_creation(request, service, model): mock_workflow = _setup_mock_workflow_in_service(request) - with mock.patch('aria.orchestrator.workflow_runner.Engine') as mock_engine_cls: + with mock.patch('aria.orchestrator.workflow_runner.Engine'): workflow_runner = _create_workflow_runner(request, mock_workflow) - _, engine_kwargs = mock_engine_cls.call_args - # TODO: fix - # assert engine_kwargs['workflow_context'].execution == workflow_runner.execution assert model.execution.get(workflow_runner.execution.id) == workflow_runner.execution assert workflow_runner.execution.service.id == service.id assert workflow_runner.execution.workflow_name == mock_workflow http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4d911208/tests/orchestrator/workflows/core/test_engine.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py index 3a14a44..2fbf4a9 100644 --- a/tests/orchestrator/workflows/core/test_engine.py +++ b/tests/orchestrator/workflows/core/test_engine.py @@ -45,7 +45,7 @@ class BaseTest(object): eng = cls._engine(workflow_func=workflow_func, workflow_context=workflow_context, executor=executor) - eng.execute(execution_graph=workflow_runner.get_execution_graph(workflow_context.execution)) + eng.execute(ctx=workflow_context) return eng @staticmethod @@ -262,7 +262,7 @@ class TestCancel(BaseTest): t = threading.Thread(target=eng.execute, kwargs=dict(ctx=workflow_context)) t.start() time.sleep(10) - eng.cancel_execution() + eng.cancel_execution(workflow_context) t.join(timeout=60) # we need to give this a *lot* of time because Travis can be *very* slow assert not t.is_alive() # if join is timed out it will not raise an exception assert workflow_context.states == ['start', 'cancel'] @@ -281,7 +281,7 @@ class TestCancel(BaseTest): eng = self._engine(workflow_func=mock_workflow, workflow_context=workflow_context, executor=executor) - eng.cancel_execution() + eng.cancel_execution(workflow_context) execution = workflow_context.execution assert execution.status == models.Execution.CANCELLED http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/4d911208/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py index aebae38..61b7ce7 100644 --- a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py +++ b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py @@ -29,8 +29,8 @@ from tests import storage def test_task_graph_into_execution_graph(tmpdir): interface_name = 'Standard' operation_name = 'create' - task_context = mock.context.simple(str(tmpdir)) - node = task_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + workflow_context = mock.context.simple(str(tmpdir)) + node = workflow_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) interface = mock.models.create_interface( node.service, interface_name, @@ -38,12 +38,12 @@ def test_task_graph_into_execution_graph(tmpdir): operation_kwargs=dict(function='test') ) node.interfaces[interface.name] = interface - task_context.model.node.update(node) + workflow_context.model.node.update(node) def sub_workflow(name, **_): return api.task_graph.TaskGraph(name) - with context.workflow.current.push(task_context): + with context.workflow.current.push(workflow_context): test_task_graph = api.task.WorkflowTask(sub_workflow, name='test_task_graph') simple_before_task = api.task.OperationTask( node, @@ -68,13 +68,12 @@ def test_task_graph_into_execution_graph(tmpdir): test_task_graph.add_dependency(simple_after_task, inner_task_graph) # Direct check - execution = task_context.model.execution.list()[0] + execution = workflow_context.model.execution.list()[0] workflow_runner.construct_execution_tasks(execution, test_task_graph, base.StubTaskExecutor) - task_context.execution = execution + workflow_context.execution = execution - execution_graph = workflow_runner.get_execution_graph(execution) - execution_tasks = topological_sort(execution_graph) + execution_tasks = topological_sort(workflow_context.graph) assert len(execution_tasks) == 7 @@ -100,7 +99,7 @@ def test_task_graph_into_execution_graph(tmpdir): _assert_execution_is_api_task(next(execution_tasks), simple_after_task) assert next(execution_tasks).stub_type == models.Task.END_WORKFLOW - storage.release_sqlite_storage(task_context.model) + storage.release_sqlite_storage(workflow_context.model) def _assert_execution_is_api_task(execution_task, api_task):