Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-299-Resuming-canceled-execution-with-frozen-task-fails 62875b56c -> 05d2b73ae
review 1 fixes Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/05d2b73a Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/05d2b73a Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/05d2b73a Branch: refs/heads/ARIA-299-Resuming-canceled-execution-with-frozen-task-fails Commit: 05d2b73ae20d144de581d9a225fdf9ab19dcd0bd Parents: 62875b5 Author: max-orlov <[email protected]> Authored: Mon Jul 10 12:36:22 2017 +0300 Committer: max-orlov <[email protected]> Committed: Mon Jul 10 12:36:22 2017 +0300 ---------------------------------------------------------------------- .../workflows/core/events_handler.py | 8 +- .../orchestrator/execution_plugin/test_local.py | 6 +- tests/orchestrator/execution_plugin/test_ssh.py | 6 +- tests/orchestrator/test_workflow_runner.py | 98 +++++++++++++++----- 4 files changed, 87 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/05d2b73a/aria/orchestrator/workflows/core/events_handler.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py index eb6f271..37801de 100644 --- a/aria/orchestrator/workflows/core/events_handler.py +++ b/aria/orchestrator/workflows/core/events_handler.py @@ -114,10 +114,6 @@ def _workflow_cancelled(workflow_context, *args, **kwargs): elif execution.status in (execution.SUCCEEDED, execution.FAILED): _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status) else: - # Any non ended task would be put back to pending state - for task in execution.tasks: - if not task.has_ended(): - task.status = task.PENDING execution.status = execution.CANCELLED execution.ended_at = datetime.utcnow() @@ -127,6 +123,10 @@ def _workflow_resume(workflow_context, *args, **kwargs): with workflow_context.persist_changes: execution = workflow_context.execution execution.status = execution.PENDING + # Any non ended task would be put back to pending state + for task in execution.tasks: + if not task.has_ended(): + task.status = task.PENDING @events.on_cancelling_workflow_signal.connect http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/05d2b73a/tests/orchestrator/execution_plugin/test_local.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py index 5b94917..e64e998 100644 --- a/tests/orchestrator/execution_plugin/test_local.py +++ b/tests/orchestrator/execution_plugin/test_local.py @@ -509,8 +509,10 @@ if __name__ == '__main__': @pytest.fixture def executor(self): result = process.ProcessExecutor() - yield result - result.close() + try: + yield result + finally: + result.close() @pytest.fixture def workflow_context(self, tmpdir): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/05d2b73a/tests/orchestrator/execution_plugin/test_ssh.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_ssh.py b/tests/orchestrator/execution_plugin/test_ssh.py index 4fa8184..a96c91d 100644 --- a/tests/orchestrator/execution_plugin/test_ssh.py +++ b/tests/orchestrator/execution_plugin/test_ssh.py @@ -277,8 +277,10 @@ class TestWithActualSSHServer(object): @pytest.fixture def executor(self): result = process.ProcessExecutor() - yield result - result.close() + try: + yield result + finally: + result.close() @pytest.fixture def workflow_context(self, tmpdir): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/05d2b73a/tests/orchestrator/test_workflow_runner.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py index 3527f34..a77d727 100644 --- a/tests/orchestrator/test_workflow_runner.py +++ b/tests/orchestrator/test_workflow_runner.py @@ -14,6 +14,7 @@ # limitations under the License. import json +import time from threading import Thread, Event from datetime import datetime @@ -58,6 +59,10 @@ class TimeoutError(BaseException): pass +class FailingTask(BaseException): + pass + + def test_undeclared_workflow(request): # validating a proper error is raised when the workflow is not declared in the service with pytest.raises(exceptions.UndeclaredWorkflowError): @@ -342,6 +347,14 @@ class TestResumableWorkflows(object): executor=executor) return wf_runner + @staticmethod + def _wait_for_active_and_cancel(workflow_runner): + if custom_events['is_active'].wait(60) is False: + raise TimeoutError("is_active wasn't set to True") + workflow_runner.cancel() + if custom_events['execution_cancelled'].wait(60) is False: + raise TimeoutError("Execution did not end") + def test_resume_workflow(self, workflow_context, thread_executor): node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) node.attributes['invocations'] = models.Attribute.wrap('invocations', 0) @@ -355,18 +368,13 @@ class TestResumableWorkflows(object): wf_thread.start() # Wait for the execution to start - if custom_events['is_active'].wait(5) is False: - raise TimeoutError("is_active wasn't set to True") - wf_runner.cancel() - - if custom_events['execution_cancelled'].wait(60) is False: - raise TimeoutError("Execution did not end") + self._wait_for_active_and_cancel(wf_runner) tasks = workflow_context.model.task.list(filters={'_stub_type': None}) assert any(task.status == task.SUCCESS for task in tasks) - assert any(task.status == task.PENDING for task in tasks) + assert any(task.status == task.RETRYING for task in tasks) custom_events['is_resumed'].set() - assert any(task.status == task.PENDING for task in tasks) + assert any(task.status == task.RETRYING for task in tasks) # Create a new workflow runner, with an existing execution id. This would cause # the old execution to restart. @@ -386,27 +394,62 @@ class TestResumableWorkflows(object): assert node.attributes['invocations'].value == 3 assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED - def test_resume_failed_task(self, workflow_context, thread_executor): + def test_resume_started_task(self, workflow_context, thread_executor): + node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) + node.attributes['invocations'] = models.Attribute.wrap('invocations', 0) + self._create_interface(workflow_context, node, mock_stuck_task) + + wf_runner = self._create_initial_workflow_runner( + workflow_context, mock_single_task_workflow, thread_executor) + + wf_thread = Thread(target=wf_runner.execute) + wf_thread.daemon = True + wf_thread.start() + + self._wait_for_active_and_cancel(wf_runner) + task = workflow_context.model.task.list(filters={'_stub_type': None})[0] + assert node.attributes['invocations'].value == 1 + assert task.status == task.STARTED + assert wf_runner.execution.status in (wf_runner.execution.CANCELLED, + wf_runner.execution.CANCELLING) + custom_events['is_resumed'].set() + + new_thread_executor = thread.ThreadExecutor() + try: + new_wf_runner = WorkflowRunner( + service_id=wf_runner.service.id, + inputs={}, + model_storage=workflow_context.model, + resource_storage=workflow_context.resource, + plugin_manager=None, + execution_id=wf_runner.execution.id, + executor=new_thread_executor) + new_wf_runner.execute() + finally: + new_thread_executor.close() + + # Wait for it to finish and assert changes. + assert node.attributes['invocations'].value == 2 + assert task.status == task.SUCCESS + assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED + + def test_resume_failed_task(self, workflow_context, thread_executor): node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) node.attributes['invocations'] = models.Attribute.wrap('invocations', 0) - self._create_interface(workflow_context, node, mock_failed_first_task) + self._create_interface(workflow_context, node, mock_failed_before_resuming) wf_runner = self._create_initial_workflow_runner( - workflow_context, mock_sequential_workflow, thread_executor) + workflow_context, mock_single_task_workflow, thread_executor) wf_thread = Thread(target=wf_runner.execute) wf_thread.setDaemon(True) wf_thread.start() - if custom_events['is_active'].wait(60) is False: - raise TimeoutError("Execution did not end") - wf_runner.cancel() - if custom_events['execution_cancelled'].wait(60) is False: - raise TimeoutError("Execution did not end") + self._wait_for_active_and_cancel(wf_runner) task = workflow_context.model.task.list(filters={'_stub_type': None})[0] assert node.attributes['invocations'].value == 2 - assert task.status == task.PENDING + assert task.status == task.STARTED assert wf_runner.execution.status in (wf_runner.execution.CANCELLED, wf_runner.execution.CANCELLING) @@ -513,13 +556,13 @@ def mock_resuming_task(ctx): if not custom_events['is_resumed'].isSet(): # if resume was called, increase by one. o/w fail the execution - second task should # fail as long it was not a part of resuming the workflow - raise BaseException("wasn't resumed yet") + raise FailingTask("wasn't resumed yet") @workflow -def mock_sequential_workflow(ctx, graph): +def mock_single_task_workflow(ctx, graph): node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) - graph.sequence( + graph.add_tasks( api.task.OperationTask(node, interface_name='aria.interfaces.lifecycle', operation_name='create', @@ -529,7 +572,7 @@ def mock_sequential_workflow(ctx, graph): @operation -def mock_failed_first_task(ctx): +def mock_failed_before_resuming(ctx): """ The task should run atmost ctx.task.max_attempts - 1 times, and only then pass. overall, the number of invocations should be ctx.task.max_attempts - 1 @@ -540,11 +583,20 @@ def mock_failed_first_task(ctx): custom_events['is_active'].set() # unfreeze the thread only when all of the invocations are done while ctx.node.attributes['invocations'] < ctx.task.max_attempts - 1: - pass + time.sleep(5) elif ctx.node.attributes['invocations'] == ctx.task.max_attempts - 1: # pass only just before the end. return else: # fail o.w. - raise BaseException("stop this task") + raise FailingTask("stop this task") + + +@operation +def mock_stuck_task(ctx): + ctx.node.attributes['invocations'] += 1 + while not custom_events['is_resumed'].isSet(): + if not custom_events['is_active'].isSet(): + custom_events['is_active'].set() + time.sleep(5)
