Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-236-Resumable-workflow-executions 92ef0fd8b -> 608f5d791
review 2 Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/608f5d79 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/608f5d79 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/608f5d79 Branch: refs/heads/ARIA-236-Resumable-workflow-executions Commit: 608f5d7917fc0f332aaa3606732f4a46068ee7f8 Parents: 92ef0fd Author: max-orlov <ma...@gigaspaces.com> Authored: Thu Jun 22 12:27:41 2017 +0300 Committer: max-orlov <ma...@gigaspaces.com> Committed: Thu Jun 22 12:27:41 2017 +0300 ---------------------------------------------------------------------- aria/orchestrator/workflow_runner.py | 23 ++++++++++---------- aria/orchestrator/workflows/core/engine.py | 6 +----- tests/orchestrator/test_workflow_runner.py | 28 ++++++++++++------------- 3 files changed, 26 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/608f5d79/aria/orchestrator/workflow_runner.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py index 829b8cd..2d4b515 100644 --- a/aria/orchestrator/workflow_runner.py +++ b/aria/orchestrator/workflow_runner.py @@ -55,9 +55,12 @@ class WorkflowRunner(object): :param task_retry_interval: Retry interval in between retry attempts of a failing task """ + if not (execution_id or (workflow_name and service_id and not execution_id)): + exceptions.InvalidWorkflowRunnerParams( + "Either provide execution id in order to resume a workflow or workflow name " + "and service id with inputs") + self._is_resume = execution_id is not None - self._is_start = \ - workflow_name is not None and service_id is not None and execution_id is None self._model_storage = model_storage self._resource_storage = resource_storage @@ -65,19 +68,15 @@ class WorkflowRunner(object): # the IDs are stored rather than the models themselves, so this module could be used # by several threads without raising errors on model objects shared between threads - if self._is_start: - self._service_id = service_id - self._workflow_name = workflow_name - self._validate_workflow_exists_for_service() - self._execution_id = self._create_execution_model(inputs).id - elif self._is_resume: + if self._is_resume: self._execution_id = execution_id self._service_id = self.execution.service.id self._workflow_name = model_storage.execution.get(self._execution_id).workflow_name else: - raise exceptions.InvalidWorkflowRunnerParams( - "Either provide execution id in order to resume a workflow or workflow name " - "and service id with inputs") + self._service_id = service_id + self._workflow_name = workflow_name + self._validate_workflow_exists_for_service() + self._execution_id = self._create_execution_model(inputs).id self._workflow_context = WorkflowContext( name=self.__class__.__name__, @@ -95,7 +94,7 @@ class WorkflowRunner(object): # transforming the execution inputs to dict, to pass them to the workflow function execution_inputs_dict = dict(inp.unwrapped for inp in self.execution.inputs.values()) - if self._is_start: + if not self._is_resume: workflow_fn = self._get_workflow_fn() tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict) compile.create_execution_tasks(self._workflow_context, tasks_graph, executor.__class__) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/608f5d79/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py index a7e5148..d5a6e70 100644 --- a/aria/orchestrator/workflows/core/engine.py +++ b/aria/orchestrator/workflows/core/engine.py @@ -48,7 +48,7 @@ class Engine(logger.LoggerMixin): executing_tasks = [] if resuming: - self._resuming_execution(ctx) + events.on_resume_workflow_signal.send(ctx) try: events.start_workflow_signal.send(ctx) @@ -73,10 +73,6 @@ class Engine(logger.LoggerMixin): raise @staticmethod - def _resuming_execution(ctx): - events.on_resume_workflow_signal.send(ctx) - - @staticmethod def cancel_execution(ctx): """ Send a cancel request to the engine. If execution already started, execution status http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/608f5d79/tests/orchestrator/test_workflow_runner.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py index c86d6a7..7a58528 100644 --- a/tests/orchestrator/test_workflow_runner.py +++ b/tests/orchestrator/test_workflow_runner.py @@ -53,6 +53,10 @@ events = { } +class TimeoutError(BaseException): + pass + + def test_undeclared_workflow(request): # validating a proper error is raised when the workflow is not declared in the service with pytest.raises(exceptions.UndeclaredWorkflowError): @@ -316,7 +320,7 @@ class TestResumableWorkflows(object): def test_resume_workflow(self, workflow_context, executor): node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) - + node.attributes['invocations'] = models.Attribute.wrap('invocations', 0) self._create_interface(workflow_context, node, mock_resuming_task) service = workflow_context.service @@ -339,12 +343,13 @@ class TestResumableWorkflows(object): wf_thread.start() # Wait for the execution to start - events['is_active'].wait(5) + if events['is_active'].wait(5) is False: + TimeoutError("is_active wasn't set to True") wf_runner.cancel() - # Make sure the execution was canceled and the task has not ended + if events['execution_ended'].wait(60) is False: + raise TimeoutError("Execution did not end") - events['execution_ended'].wait(10) first_task, second_task = workflow_context.model.task.list(filters={'_stub_type': None}) assert first_task.status == first_task.SUCCESS assert second_task.status in (second_task.FAILED, second_task.RETRYING) @@ -366,7 +371,7 @@ class TestResumableWorkflows(object): # Wait for it to finish and assert changes. assert second_task.status == second_task.SUCCESS - assert node.attributes['invocations'].value == 2 + assert node.attributes['invocations'].value == 3 assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED @staticmethod @@ -412,7 +417,7 @@ class TestResumableWorkflows(object): @pytest.fixture(autouse=True) def register_to_events(self): - def execution_ended(**_): + def execution_ended(*args, **kwargs): events['execution_ended'].set() on_cancelled_workflow_signal.connect(execution_ended) @@ -433,16 +438,11 @@ def mock_workflow(ctx, graph): @operation def mock_resuming_task(ctx): + ctx.node.attributes['invocations'] += 1 - if 'invocations' not in ctx.node.attributes: - # This is the first node invocation - ctx.node.attributes['invocations'] = 1 - else: + if ctx.node.attributes['invocations'] != 1: events['is_active'].set() - # This is the second node invocation - if events['is_resumed'].isSet(): + if not events['is_resumed'].isSet(): # if resume was called, increase by one. o/w fail the execution - second task should # fail as long it was not a part of resuming the workflow - ctx.node.attributes['invocations'] += 1 - else: raise BaseException("wasn't resumed yet")