Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-237-Support-for-resuming-failed-workflow-executions 1ad2a0db6 -> ef1419cd0 (forced update)
removed operations and workflows, and fixed minor issue in validation.py Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/ef1419cd Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/ef1419cd Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/ef1419cd Branch: refs/heads/ARIA-237-Support-for-resuming-failed-workflow-executions Commit: ef1419cd068ca109f2493f940c6d15e11964c9c4 Parents: 91cbb78 Author: max-orlov <[email protected]> Authored: Mon Jul 10 16:59:03 2017 +0300 Committer: max-orlov <[email protected]> Committed: Mon Jul 10 17:28:23 2017 +0300 ---------------------------------------------------------------------- aria/utils/validation.py | 2 +- tests/orchestrator/test_workflow_runner.py | 82 ++++++++++--------------- 2 files changed, 32 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ef1419cd/aria/utils/validation.py ---------------------------------------------------------------------- diff --git a/aria/utils/validation.py b/aria/utils/validation.py index 3452dcc..0c2af10 100644 --- a/aria/utils/validation.py +++ b/aria/utils/validation.py @@ -78,7 +78,7 @@ def validate_function_arguments(func, func_kwargs): # all args without the ones with default values args = func.func_code.co_varnames[:args_count] - non_default_args = args[:len(func.func_defaults)] if func.func_defaults else args + non_default_args = args[:len(func.func_defaults) - 1] if func.func_defaults else args # Check if any args without default values is missing in the func_kwargs for arg in non_default_args: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ef1419cd/tests/orchestrator/test_workflow_runner.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py index 112f894..adb19e6 100644 --- a/tests/orchestrator/test_workflow_runner.py +++ b/tests/orchestrator/test_workflow_runner.py @@ -359,10 +359,11 @@ class TestResumableWorkflows(object): def test_resume_workflow(self, workflow_context, thread_executor): node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) node.attributes['invocations'] = models.Attribute.wrap('invocations', 0) - self._create_interface(workflow_context, node, mock_resuming_task) + self._create_interface(workflow_context, node, mock_failed_task) wf_runner = self._create_initial_workflow_runner( - workflow_context, mock_two_parallel_tasks_workflow, thread_executor) + workflow_context, mock_parallel_tasks_workflow, thread_executor, + inputs={'number_of_tasks': 2}) wf_thread = Thread(target=wf_runner.execute) wf_thread.daemon = True @@ -403,7 +404,8 @@ class TestResumableWorkflows(object): self._create_interface(workflow_context, node, mock_stuck_task) wf_runner = self._create_initial_workflow_runner( - workflow_context, mock_single_task_workflow, thread_executor) + workflow_context, mock_parallel_tasks_workflow, thread_executor, + inputs={'number_of_tasks': 1}) wf_thread = Thread(target=wf_runner.execute) wf_thread.daemon = True @@ -444,8 +446,9 @@ class TestResumableWorkflows(object): node.attributes['invocations'] = models.Attribute.wrap('invocations', 0) self._create_interface(workflow_context, node, mock_failed_before_resuming) - wf_runner = self._create_initial_workflow_runner( - workflow_context, mock_single_task_workflow, thread_executor) + wf_runner = self._create_initial_workflow_runner(workflow_context, + mock_parallel_tasks_workflow, + thread_executor) wf_thread = Thread(target=wf_runner.execute) wf_thread.setDaemon(True) wf_thread.start() @@ -488,10 +491,14 @@ class TestResumableWorkflows(object): def test_resume_failed_task_and_successful_task(self, workflow_context, thread_executor): node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) node.attributes['invocations'] = models.Attribute.wrap('invocations', 0) - self._create_interface(workflow_context, node, mock_two_different_tasks) + self._create_interface(workflow_context, node, mock_failed_task) wf_runner = self._create_initial_workflow_runner( - workflow_context, mock_two_parallel_tasks_workflow, thread_executor) + workflow_context, + mock_parallel_tasks_workflow, + thread_executor, + inputs={'retry_interval': 1, 'max_attempts': 1, 'number_of_tasks': 2} + ) wf_thread = Thread(target=wf_runner.execute) wf_thread.setDaemon(True) wf_thread.start() @@ -591,35 +598,18 @@ class TestResumableWorkflows(object): @workflow -def mock_two_parallel_tasks_workflow(ctx, graph): +def mock_parallel_tasks_workflow(ctx, graph, + retry_interval=1, max_attempts=10, number_of_tasks=1): node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) - graph.add_tasks( - api.task.OperationTask( - node, interface_name='aria.interfaces.lifecycle', operation_name='create'), - api.task.OperationTask( - node, interface_name='aria.interfaces.lifecycle', operation_name='create') - ) - - -@operation -def mock_resuming_task(ctx): - ctx.node.attributes['invocations'] += 1 - - if ctx.node.attributes['invocations'] != 1: - custom_events['is_active'].set() - if not custom_events['is_resumed'].isSet(): - # if resume was called, increase by one. o/w fail the execution - second task should - # fail as long it was not a part of resuming the workflow - raise FailingTask("wasn't resumed yet") - - -@workflow -def mock_single_task_workflow(ctx, graph): - node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) - graph.add_tasks( - api.task.OperationTask( - node, 'aria.interfaces.lifecycle', 'create', retry_interval=1, max_attempts=10) - ) + tasks = [ + api.task.OperationTask(node, + 'aria.interfaces.lifecycle', + 'create', + retry_interval=retry_interval, + max_attempts=max_attempts) + for _ in xrange(number_of_tasks) + ] + graph.add_tasks(*tasks) @operation @@ -653,23 +643,13 @@ def mock_stuck_task(ctx): time.sleep(5) -@workflow -def mock_two_parallel_tasks_workflow(ctx, graph): - node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) - graph.add_tasks( - api.task.OperationTask( - node, 'aria.interfaces.lifecycle', 'create', retry_interval=1, max_attempts=1), - api.task.OperationTask( - node, 'aria.interfaces.lifecycle', 'create', retry_interval=1, max_attempts=1), - ) - - @operation -def mock_two_different_tasks(ctx): +def mock_failed_task(ctx): ctx.node.attributes['invocations'] += 1 - # The first task should end gracefully - if ctx.node.attributes['invocations'] == 2: - # The second task should fail only before resuming + if ctx.node.attributes['invocations'] != 1: + custom_events['is_active'].set() if not custom_events['is_resumed'].isSet(): - raise FailingTask("First execution should fail") + # if resume was called, increase by one. o/w fail the execution - second task should + # fail as long it was not a part of resuming the workflow + raise FailingTask("wasn't resumed yet")
