Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-276-Support-model-instrumentation-for-workflows ff1596e0d -> a94a4dd12
more tests fixes Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/a94a4dd1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/a94a4dd1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/a94a4dd1 Branch: refs/heads/ARIA-276-Support-model-instrumentation-for-workflows Commit: a94a4dd127d9722674e9805161b8f5fdf2b675d7 Parents: ff1596e Author: max-orlov <[email protected]> Authored: Sun Jun 11 12:16:33 2017 +0300 Committer: max-orlov <[email protected]> Committed: Sun Jun 11 12:16:33 2017 +0300 ---------------------------------------------------------------------- aria/storage/collection_instrumentation.py | 1 - tests/orchestrator/context/test_serialize.py | 20 +++-- tests/orchestrator/execution_plugin/test_ssh.py | 50 +++++------ .../orchestrator/workflows/core/test_engine.py | 88 +++++++++++++++----- .../executor/test_process_executor_extension.py | 24 +++--- .../test_process_executor_tracked_changes.py | 26 +++--- 6 files changed, 130 insertions(+), 79 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a94a4dd1/aria/storage/collection_instrumentation.py ---------------------------------------------------------------------- diff --git a/aria/storage/collection_instrumentation.py b/aria/storage/collection_instrumentation.py index b8f656c..454f97a 100644 --- a/aria/storage/collection_instrumentation.py +++ b/aria/storage/collection_instrumentation.py @@ -12,7 +12,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from functools import partial from . import exceptions http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a94a4dd1/tests/orchestrator/context/test_serialize.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_serialize.py b/tests/orchestrator/context/test_serialize.py index 4db7bf4..0919e81 100644 --- a/tests/orchestrator/context/test_serialize.py +++ b/tests/orchestrator/context/test_serialize.py @@ -33,16 +33,10 @@ def test_serialize_operation_context(context, executor, tmpdir): test_file.write(TEST_FILE_CONTENT) resource = context.resource resource.service_template.upload(TEST_FILE_ENTRY_ID, str(test_file)) - graph = _mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter - eng = engine.Engine(executor=executor, workflow_context=context, tasks_graph=graph) - eng.execute() - -@workflow -def _mock_workflow(ctx, graph): - node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + node = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) plugin = mock.models.create_plugin() - ctx.model.plugin.put(plugin) + context.model.plugin.put(plugin) interface = mock.models.create_interface( node.service, 'test', @@ -51,6 +45,16 @@ def _mock_workflow(ctx, graph): plugin=plugin) ) node.interfaces[interface.name] = interface + context.model.node.update(node) + + graph = _mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter + eng = engine.Engine(executor=executor, workflow_context=context, tasks_graph=graph) + eng.execute() + + +@workflow +def _mock_workflow(ctx, graph): + node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) task = api.task.OperationTask(node, interface_name='test', operation_name='op') graph.add_tasks(task) return graph http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a94a4dd1/tests/orchestrator/execution_plugin/test_ssh.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_ssh.py b/tests/orchestrator/execution_plugin/test_ssh.py index 8b326e7..8c4dd2d 100644 --- a/tests/orchestrator/execution_plugin/test_ssh.py +++ b/tests/orchestrator/execution_plugin/test_ssh.py @@ -214,33 +214,33 @@ class TestWithActualSSHServer(object): else: operation = operations.run_script_with_ssh + node = self._workflow_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + arguments = { + 'script_path': script_path, + 'fabric_env': _FABRIC_ENV, + 'process': process, + 'use_sudo': use_sudo, + 'custom_env_var': custom_input, + 'test_operation': '', + } + if hide_output: + arguments['hide_output'] = hide_output + if commands: + arguments['commands'] = commands + interface = mock.models.create_interface( + node.service, + 'test', + 'op', + operation_kwargs=dict( + function='{0}.{1}'.format( + operations.__name__, + operation.__name__), + arguments=arguments) + ) + node.interfaces[interface.name] = interface + @workflow def mock_workflow(ctx, graph): - node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) - arguments = { - 'script_path': script_path, - 'fabric_env': _FABRIC_ENV, - 'process': process, - 'use_sudo': use_sudo, - 'custom_env_var': custom_input, - 'test_operation': '', - } - if hide_output: - arguments['hide_output'] = hide_output - if commands: - arguments['commands'] = commands - interface = mock.models.create_interface( - node.service, - 'test', - 'op', - operation_kwargs=dict( - function='{0}.{1}'.format( - operations.__name__, - operation.__name__), - arguments=arguments) - ) - node.interfaces[interface.name] = interface - ops = [] for test_operation in test_operations: op_arguments = arguments.copy() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a94a4dd1/tests/orchestrator/workflows/core/test_engine.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py index 6d2836c..0438544 100644 --- a/tests/orchestrator/workflows/core/test_engine.py +++ b/tests/orchestrator/workflows/core/test_engine.py @@ -55,12 +55,7 @@ class BaseTest(object): tasks_graph=graph) @staticmethod - def _op(ctx, - func, - arguments=None, - max_attempts=None, - retry_interval=None, - ignore_failure=None): + def _create_interface(ctx, func, arguments=None): node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) interface_name = 'aria.interfaces.lifecycle' operation_kwargs = dict(function='{name}.{func.__name__}'.format( @@ -72,6 +67,17 @@ class BaseTest(object): interface = mock.models.create_interface(node.service, interface_name, operation_name, operation_kwargs=operation_kwargs) node.interfaces[interface.name] = interface + ctx.model.node.update(node) + + return node, interface_name, operation_name + + @staticmethod + def _op(node, + operation_name, + arguments=None, + max_attempts=None, + retry_interval=None, + ignore_failure=None): return api.task.OperationTask( node, @@ -158,9 +164,11 @@ class TestEngine(BaseTest): assert execution.status == models.Execution.SUCCEEDED def test_single_task_successful_execution(self, workflow_context, executor): + node, _, operation_name = self._create_interface(workflow_context, mock_success_task) + @workflow def mock_workflow(ctx, graph): - graph.add_tasks(self._op(ctx, func=mock_success_task)) + graph.add_tasks(self._op(node, operation_name)) self._execute( workflow_func=mock_workflow, workflow_context=workflow_context, @@ -170,9 +178,11 @@ class TestEngine(BaseTest): assert global_test_holder.get('sent_task_signal_calls') == 1 def test_single_task_failed_execution(self, workflow_context, executor): + node, _, operation_name = self._create_interface(workflow_context, mock_failed_task) + @workflow def mock_workflow(ctx, graph): - graph.add_tasks(self._op(ctx, func=mock_failed_task)) + graph.add_tasks(self._op(node, operation_name)) with pytest.raises(exceptions.ExecutorException): self._execute( workflow_func=mock_workflow, @@ -187,10 +197,13 @@ class TestEngine(BaseTest): assert execution.status == models.Execution.FAILED def test_two_tasks_execution_order(self, workflow_context, executor): + node, _, operation_name = self._create_interface( + workflow_context, mock_ordered_task, {'counter': 1}) + @workflow def mock_workflow(ctx, graph): - op1 = self._op(ctx, func=mock_ordered_task, arguments={'counter': 1}) - op2 = self._op(ctx, func=mock_ordered_task, arguments={'counter': 2}) + op1 = self._op(node, operation_name, arguments={'counter': 1}) + op2 = self._op(node, operation_name, arguments={'counter': 2}) graph.sequence(op1, op2) self._execute( workflow_func=mock_workflow, @@ -202,11 +215,14 @@ class TestEngine(BaseTest): assert global_test_holder.get('sent_task_signal_calls') == 2 def test_stub_and_subworkflow_execution(self, workflow_context, executor): + node, _, operation_name = self._create_interface( + workflow_context, mock_ordered_task, {'counter': 1}) + @workflow def sub_workflow(ctx, graph): - op1 = self._op(ctx, func=mock_ordered_task, arguments={'counter': 1}) + op1 = self._op(node, operation_name, arguments={'counter': 1}) op2 = api.task.StubTask() - op3 = self._op(ctx, func=mock_ordered_task, arguments={'counter': 2}) + op3 = self._op(node, operation_name, arguments={'counter': 2}) graph.sequence(op1, op2, op3) @workflow @@ -225,11 +241,13 @@ class TestCancel(BaseTest): def test_cancel_started_execution(self, workflow_context, executor): number_of_tasks = 100 + node, _, operation_name = self._create_interface( + workflow_context, mock_sleep_task, {'seconds': 0.1}) @workflow def mock_workflow(ctx, graph): operations = ( - self._op(ctx, func=mock_sleep_task, arguments=dict(seconds=0.1)) + self._op(node, operation_name, arguments=dict(seconds=0.1)) for _ in range(number_of_tasks) ) return graph.sequence(*operations) @@ -267,9 +285,12 @@ class TestCancel(BaseTest): class TestRetries(BaseTest): def test_two_max_attempts_and_success_on_retry(self, workflow_context, executor): + node, _, operation_name = self._create_interface( + workflow_context, mock_conditional_failure_task, {'failure_count': 1}) + @workflow def mock_workflow(ctx, graph): - op = self._op(ctx, func=mock_conditional_failure_task, + op = self._op(node, operation_name, arguments={'failure_count': 1}, max_attempts=2) graph.add_tasks(op) @@ -283,9 +304,12 @@ class TestRetries(BaseTest): assert global_test_holder.get('sent_task_signal_calls') == 2 def test_two_max_attempts_and_failure_on_retry(self, workflow_context, executor): + node, _, operation_name = self._create_interface( + workflow_context, mock_conditional_failure_task, {'failure_count': 1}) + @workflow def mock_workflow(ctx, graph): - op = self._op(ctx, func=mock_conditional_failure_task, + op = self._op(node, operation_name, arguments={'failure_count': 2}, max_attempts=2) graph.add_tasks(op) @@ -300,9 +324,11 @@ class TestRetries(BaseTest): assert global_test_holder.get('sent_task_signal_calls') == 2 def test_three_max_attempts_and_success_on_first_retry(self, workflow_context, executor): + node, _, operation_name = self._create_interface( + workflow_context, mock_conditional_failure_task, {'failure_count': 1}) @workflow def mock_workflow(ctx, graph): - op = self._op(ctx, func=mock_conditional_failure_task, + op = self._op(node, operation_name, arguments={'failure_count': 1}, max_attempts=3) graph.add_tasks(op) @@ -316,9 +342,12 @@ class TestRetries(BaseTest): assert global_test_holder.get('sent_task_signal_calls') == 2 def test_three_max_attempts_and_success_on_second_retry(self, workflow_context, executor): + node, _, operation_name = self._create_interface( + workflow_context, mock_conditional_failure_task, {'failure_count': 1}) + @workflow def mock_workflow(ctx, graph): - op = self._op(ctx, func=mock_conditional_failure_task, + op = self._op(node, operation_name, arguments={'failure_count': 2}, max_attempts=3) graph.add_tasks(op) @@ -332,9 +361,11 @@ class TestRetries(BaseTest): assert global_test_holder.get('sent_task_signal_calls') == 3 def test_infinite_retries(self, workflow_context, executor): + node, _, operation_name = self._create_interface( + workflow_context, mock_conditional_failure_task, {'failure_count': 1}) @workflow def mock_workflow(ctx, graph): - op = self._op(ctx, func=mock_conditional_failure_task, + op = self._op(node, operation_name, arguments={'failure_count': 1}, max_attempts=-1) graph.add_tasks(op) @@ -358,9 +389,11 @@ class TestRetries(BaseTest): executor=executor) def _test_retry_interval(self, retry_interval, workflow_context, executor): + node, _, operation_name = self._create_interface( + workflow_context, mock_conditional_failure_task, {'failure_count': 1}) @workflow def mock_workflow(ctx, graph): - op = self._op(ctx, func=mock_conditional_failure_task, + op = self._op(node, operation_name, arguments={'failure_count': 1}, max_attempts=2, retry_interval=retry_interval) @@ -378,9 +411,11 @@ class TestRetries(BaseTest): assert global_test_holder.get('sent_task_signal_calls') == 2 def test_ignore_failure(self, workflow_context, executor): + node, _, operation_name = self._create_interface( + workflow_context, mock_conditional_failure_task, {'failure_count': 1}) @workflow def mock_workflow(ctx, graph): - op = self._op(ctx, func=mock_conditional_failure_task, + op = self._op(node, operation_name, ignore_failure=True, arguments={'failure_count': 100}, max_attempts=100) @@ -401,10 +436,12 @@ class TestTaskRetryAndAbort(BaseTest): def test_task_retry_default_interval(self, workflow_context, executor): default_retry_interval = 0.1 + node, _, operation_name = self._create_interface( + workflow_context, mock_task_retry, {'message': self.message}) @workflow def mock_workflow(ctx, graph): - op = self._op(ctx, func=mock_task_retry, + op = self._op(node, operation_name, arguments={'message': self.message}, retry_interval=default_retry_interval, max_attempts=2) @@ -425,10 +462,13 @@ class TestTaskRetryAndAbort(BaseTest): def test_task_retry_custom_interval(self, workflow_context, executor): default_retry_interval = 100 custom_retry_interval = 0.1 + node, _, operation_name = self._create_interface( + workflow_context, mock_task_retry, {'message': self.message, + 'retry_interval': custom_retry_interval}) @workflow def mock_workflow(ctx, graph): - op = self._op(ctx, func=mock_task_retry, + op = self._op(node, operation_name, arguments={'message': self.message, 'retry_interval': custom_retry_interval}, retry_interval=default_retry_interval, @@ -449,9 +489,11 @@ class TestTaskRetryAndAbort(BaseTest): assert global_test_holder.get('sent_task_signal_calls') == 2 def test_task_abort(self, workflow_context, executor): + node, _, operation_name = self._create_interface( + workflow_context, mock_task_abort, {'message': self.message}) @workflow def mock_workflow(ctx, graph): - op = self._op(ctx, func=mock_task_abort, + op = self._op(node, operation_name, arguments={'message': self.message}, retry_interval=100, max_attempts=100) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a94a4dd1/tests/orchestrator/workflows/executor/test_process_executor_extension.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor_extension.py b/tests/orchestrator/workflows/executor/test_process_executor_extension.py index 7969457..5f0b75f 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor_extension.py +++ b/tests/orchestrator/workflows/executor/test_process_executor_extension.py @@ -32,19 +32,23 @@ def test_decorate_extension(context, executor): def get_node(ctx): return ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + node = get_node(context) + interface_name = 'test_interface' + operation_name = 'operation' + interface = mock.models.create_interface( + context.service, + interface_name, + operation_name, + operation_kwargs=dict(function='{0}.{1}'.format(__name__, _mock_operation.__name__), + arguments=arguments) + ) + node.interfaces[interface.name] = interface + context.model.node.update(node) + + @workflow def mock_workflow(ctx, graph): node = get_node(ctx) - interface_name = 'test_interface' - operation_name = 'operation' - interface = mock.models.create_interface( - ctx.service, - interface_name, - operation_name, - operation_kwargs=dict(function='{0}.{1}'.format(__name__, _mock_operation.__name__), - arguments=arguments) - ) - node.interfaces[interface.name] = interface task = api.task.OperationTask( node, interface_name=interface_name, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a94a4dd1/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py index 2d80a3b..7dbcc5a 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py +++ b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py @@ -83,20 +83,22 @@ def test_apply_tracked_changes_during_an_operation(context, executor): def _run_workflow(context, executor, op_func, arguments=None): + node = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + interface_name = 'test_interface' + operation_name = 'operation' + wf_arguments = arguments or {} + interface = mock.models.create_interface( + context.service, + interface_name, + operation_name, + operation_kwargs=dict(function=_operation_mapping(op_func), + arguments=wf_arguments) + ) + node.interfaces[interface.name] = interface + context.model.node.update(node) + @workflow def mock_workflow(ctx, graph): - node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) - interface_name = 'test_interface' - operation_name = 'operation' - wf_arguments = arguments or {} - interface = mock.models.create_interface( - ctx.service, - interface_name, - operation_name, - operation_kwargs=dict(function=_operation_mapping(op_func), - arguments=wf_arguments) - ) - node.interfaces[interface.name] = interface task = api.task.OperationTask( node, interface_name=interface_name,
