Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-46-execution-plugin a48d31c74 -> 433d55ea6 (forced update)
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/433d55ea/tests/orchestrator/execution_plugin/test_ctx_proxy_server.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_ctx_proxy_server.py b/tests/orchestrator/execution_plugin/test_ctx_proxy_server.py new file mode 100644 index 0000000..98ceff9 --- /dev/null +++ b/tests/orchestrator/execution_plugin/test_ctx_proxy_server.py @@ -0,0 +1,359 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import time +import sys +import subprocess +import StringIO + +import pytest + +from aria.orchestrator.execution_plugin import ctx_proxy + + +class TestCtxProxy(object): + + def test_attribute_access(self, server): + response = self.request(server, 'stub_attr', 'some_property') + assert response == 'some_value' + + def test_sugared_attribute_access(self, server): + response = self.request(server, 'stub-attr', 'some-property') + assert response == 'some_value' + + def test_dict_prop_access_get_key(self, server): + response = self.request(server, 'node', 'properties', 'prop1') + assert response == 'value1' + + def test_dict_prop_access_get_key_nested(self, server): + response = self.request(server, 'node', 'properties', 'prop2.nested_prop1') + assert response == 'nested_value1' + + def test_dict_prop_access_get_with_list_index(self, server): + response = self.request(server, 'node', 'properties', 'prop3[2].value') + assert response == 'value_2' + + def test_dict_prop_access_set(self, server, ctx): + self.request(server, 'node', 'properties', 'prop4.key', 'new_value') + self.request(server, 'node', 'properties', 'prop3[2].value', 'new_value_2') + self.request(server, 'node', 'properties', 'prop4.some.new.path', + 'some_new_value') + assert ctx.node.properties['prop4']['key'] == 'new_value' + assert ctx.node.properties['prop3'][2]['value'] == 'new_value_2' + assert ctx.node.properties['prop4']['some']['new']['path'] == 'some_new_value' + + def test_illegal_dict_access(self, server): + self.request(server, 'node', 'properties', 'prop4.key', 'new_value') + with pytest.raises(RuntimeError): + self.request(server, 'node', 'properties', 'prop4.key', 'new_value', 'what') + + def test_method_invocation(self, server): + args = ['arg1', 'arg2', 'arg3'] + response_args = self.request(server, 'stub-method', *args) + assert response_args == args + + def test_method_invocation_no_args(self, server): + response = self.request(server, 'stub-method') + assert response == [] + + def test_method_invocation_kwargs(self, server): + arg1 = 'arg1' + arg2 = 'arg2' + arg4 = 'arg4_override' + arg5 = 'arg5' + kwargs = dict( + arg4=arg4, + arg5=arg5) + response = self.request(server, 'stub_args', arg1, arg2, kwargs) + assert response == dict( + arg1=arg1, + arg2=arg2, + arg3='arg3', + arg4=arg4, + args=[], + kwargs=dict( + arg5=arg5)) + + def test_empty_return_value(self, server): + response = self.request(server, 'stub_none') + assert response is None + + def test_client_request_timeout(self, server): + with pytest.raises(IOError): + ctx_proxy.client._client_request(server.socket_url, + args=['stub-sleep', '0.5'], + timeout=0.1) + + def test_processing_exception(self, server): + with pytest.raises(ctx_proxy.client._RequestError): + self.request(server, 'property_that_does_not_exist') + + def test_not_json_serializable(self, server): + with pytest.raises(ctx_proxy.client._RequestError): + self.request(server, 'logger') + + def test_no_string_arg(self, server): + args = ['stub_method', 1, 2] + response = self.request(server, *args) + assert response == args[1:] + + class StubAttribute(object): + some_property = 'some_value' + + class NodeAttribute(object): + def __init__(self, properties): + self.properties = properties + + @staticmethod + def stub_method(*args): + return args + + @staticmethod + def stub_sleep(seconds): + time.sleep(float(seconds)) + + @staticmethod + def stub_args(arg1, arg2, arg3='arg3', arg4='arg4', *args, **kwargs): + return dict( + arg1=arg1, + arg2=arg2, + arg3=arg3, + arg4=arg4, + args=args, + kwargs=kwargs) + + @pytest.fixture + def ctx(self): + class MockCtx(object): + pass + ctx = MockCtx() + properties = { + 'prop1': 'value1', + 'prop2': { + 'nested_prop1': 'nested_value1' + }, + 'prop3': [ + {'index': 0, 'value': 'value_0'}, + {'index': 1, 'value': 'value_1'}, + {'index': 2, 'value': 'value_2'} + ], + 'prop4': { + 'key': 'value' + } + } + ctx.stub_none = None + ctx.stub_method = self.stub_method + ctx.stub_sleep = self.stub_sleep + ctx.stub_args = self.stub_args + ctx.stub_attr = self.StubAttribute() + ctx.node = self.NodeAttribute(properties) + return ctx + + @pytest.fixture + def server(self, ctx): + result = ctx_proxy.server.CtxProxy(ctx) + yield result + result.close() + + def request(self, server, *args): + return ctx_proxy.client._client_request(server.socket_url, args, timeout=5) + + +class TestArgumentParsing(object): + + def test_socket_url_arg(self): + self.expected.update(dict(socket_url='sock_url')) + ctx_proxy.client.main(['--socket-url', self.expected.get('socket_url')]) + + def test_socket_url_env(self): + expected_socket_url = 'env_sock_url' + os.environ['CTX_SOCKET_URL'] = expected_socket_url + self.expected.update(dict(socket_url=expected_socket_url)) + ctx_proxy.client.main([]) + + def test_socket_url_missing(self): + del os.environ['CTX_SOCKET_URL'] + with pytest.raises(RuntimeError): + ctx_proxy.client.main([]) + + def test_args(self): + self.expected.update(dict(args=['1', '2', '3'])) + ctx_proxy.client.main(self.expected.get('args')) + + def test_timeout(self): + self.expected.update(dict(timeout='10')) + ctx_proxy.client.main(['--timeout', self.expected.get('timeout')]) + self.expected.update(dict(timeout='15')) + ctx_proxy.client.main(['-t', self.expected.get('timeout')]) + + def test_mixed_order(self): + self.expected.update(dict( + args=['1', '2', '3'], timeout='20', socket_url='mixed_socket_url')) + ctx_proxy.client.main( + ['-t', self.expected.get('timeout')] + + ['--socket-url', self.expected.get('socket_url')] + + self.expected.get('args')) + ctx_proxy.client.main( + ['-t', self.expected.get('timeout')] + + self.expected.get('args') + + ['--socket-url', self.expected.get('socket_url')]) + ctx_proxy.client.main( + self.expected.get('args') + + ['-t', self.expected.get('timeout')] + + ['--socket-url', self.expected.get('socket_url')]) + + def test_json_args(self): + args = ['@1', '@[1,2,3]', '@{"key":"value"}'] + expected_args = [1, [1, 2, 3], {'key': 'value'}] + self.expected.update(dict(args=expected_args)) + ctx_proxy.client.main(args) + + def test_json_arg_prefix(self): + args = ['_1', '@1'] + expected_args = [1, '@1'] + self.expected.update(dict(args=expected_args)) + ctx_proxy.client.main(args + ['--json-arg-prefix', '_']) + + def test_json_output(self): + self.assert_valid_output('string', 'string', '"string"') + self.assert_valid_output(1, '1', '1') + self.assert_valid_output([1, '2'], "[1, '2']", '[1, "2"]') + self.assert_valid_output({'key': 1}, + "{'key': 1}", + '{"key": 1}') + self.assert_valid_output(False, '', 'false') + self.assert_valid_output(True, 'True', 'true') + self.assert_valid_output([], '', '[]') + self.assert_valid_output({}, '', '{}') + + def assert_valid_output(self, response, ex_typed_output, ex_json_output): + self.mock_response = response + current_stdout = sys.stdout + + def run(args, expected): + output = StringIO.StringIO() + sys.stdout = output + ctx_proxy.client.main(args) + assert output.getvalue() == expected + + try: + run([], ex_typed_output) + run(['-j'], ex_json_output) + run(['--json-output'], ex_json_output) + finally: + sys.stdout = current_stdout + + def mock_client_request(self, socket_url, args, timeout): + assert socket_url == self.expected.get('socket_url') + assert args == self.expected.get('args') + assert timeout == int(self.expected.get('timeout')) + return self.mock_response + + @pytest.fixture(autouse=True) + def patch_client_request(self, mocker): + mocker.patch.object(ctx_proxy.client, + ctx_proxy.client._client_request.__name__, + self.mock_client_request) + mocker.patch.dict('os.environ', {'CTX_SOCKET_URL': 'stub'}) + + @pytest.fixture(autouse=True) + def defaults(self): + self.expected = dict(args=[], timeout=30, socket_url='stub') + self.mock_response = None + + +class TestCtxEntryPoint(object): + + def test_ctx_in_path(self): + p = subprocess.Popen(['ctx', '--help'], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + p.communicate() + assert not p.wait() + + +class TestPathDictAccess(object): + def test_simple_set(self): + obj = {} + path_dict = ctx_proxy.server._PathDictAccess(obj) + path_dict.set('foo', 42) + assert obj == {'foo': 42} + + def test_nested_set(self): + obj = {'foo': {}} + path_dict = ctx_proxy.server._PathDictAccess(obj) + path_dict.set('foo.bar', 42) + assert obj == {'foo': {'bar': 42}} + + def test_set_index(self): + obj = {'foo': [None, {'bar': 0}]} + path_dict = ctx_proxy.server._PathDictAccess(obj) + path_dict.set('foo[1].bar', 42) + assert obj == {'foo': [None, {'bar': 42}]} + + def test_set_nonexistent_parent(self): + obj = {} + path_dict = ctx_proxy.server._PathDictAccess(obj) + path_dict.set('foo.bar', 42) + assert obj == {'foo': {'bar': 42}} + + def test_set_nonexistent_parent_nested(self): + obj = {} + path_dict = ctx_proxy.server._PathDictAccess(obj) + path_dict.set('foo.bar.baz', 42) + assert obj == {'foo': {'bar': {'baz': 42}}} + + def test_simple_get(self): + obj = {'foo': 42} + path_dict = ctx_proxy.server._PathDictAccess(obj) + result = path_dict.get('foo') + assert result == 42 + + def test_nested_get(self): + obj = {'foo': {'bar': 42}} + path_dict = ctx_proxy.server._PathDictAccess(obj) + result = path_dict.get('foo.bar') + assert result == 42 + + def test_nested_get_shadows_dotted_name(self): + obj = {'foo': {'bar': 42}, 'foo.bar': 58} + path_dict = ctx_proxy.server._PathDictAccess(obj) + result = path_dict.get('foo.bar') + assert result == 42 + + def test_index_get(self): + obj = {'foo': [0, 1]} + path_dict = ctx_proxy.server._PathDictAccess(obj) + result = path_dict.get('foo[1]') + assert result == 1 + + def test_get_nonexistent(self): + obj = {} + path_dict = ctx_proxy.server._PathDictAccess(obj) + with pytest.raises(RuntimeError): + path_dict.get('foo') + + def test_get_by_index_not_list(self): + obj = {'foo': {0: 'not-list'}} + path_dict = ctx_proxy.server._PathDictAccess(obj) + with pytest.raises(RuntimeError): + path_dict.get('foo[0]') + + def test_get_by_index_nonexistent_parent(self): + obj = {} + path_dict = ctx_proxy.server._PathDictAccess(obj) + with pytest.raises(RuntimeError): + path_dict.get('foo[1]') http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/433d55ea/tests/orchestrator/execution_plugin/test_global_ctx.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_global_ctx.py b/tests/orchestrator/execution_plugin/test_global_ctx.py new file mode 100644 index 0000000..dad7547 --- /dev/null +++ b/tests/orchestrator/execution_plugin/test_global_ctx.py @@ -0,0 +1,28 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from aria.orchestrator import execution_plugin + + +def test_python_script_scope(): + assert execution_plugin.ctx is None + assert execution_plugin.inputs is None + ctx = object() + inputs = object() + with execution_plugin.python_script_scope(operation_ctx=ctx, operation_inputs=inputs): + assert execution_plugin.ctx is ctx + assert execution_plugin.inputs is inputs + assert execution_plugin.ctx is None + assert execution_plugin.inputs is None http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/433d55ea/tests/orchestrator/execution_plugin/test_local.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py new file mode 100644 index 0000000..497da48 --- /dev/null +++ b/tests/orchestrator/execution_plugin/test_local.py @@ -0,0 +1,587 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os + +import pytest + +from aria import workflow +from aria.orchestrator import events +from aria.orchestrator.workflows import api +from aria.orchestrator.workflows.exceptions import ExecutorException +from aria.orchestrator.exceptions import TaskAbortException, TaskRetryException +from aria.orchestrator.execution_plugin import operations +from aria.orchestrator.execution_plugin.exceptions import ProcessException +from aria.orchestrator.execution_plugin import local +from aria.orchestrator.execution_plugin import constants +from aria.orchestrator.workflows.executor import process +from aria.orchestrator.workflows.core import engine + +from tests import mock, storage +from tests.orchestrator.workflows.helpers import events_collector + +IS_WINDOWS = os.name == 'nt' + + +class TestLocalRunScript(object): + + def test_script_path_parameter(self, executor, workflow_context, tmpdir): + script_path = self._create_script( + tmpdir, + linux_script='''#! /bin/bash -e + ctx node-instance runtime-properties map.key value + ''', + windows_script=''' + ctx node-instance runtime-properties map.key value + ''') + props = self._run( + executor, workflow_context, + script_path=script_path) + assert props['map']['key'] == 'value' + + def test_process_env(self, executor, workflow_context, tmpdir): + script_path = self._create_script( + tmpdir, + linux_script='''#! /bin/bash -e + ctx node-instance runtime-properties map.key1 $key1 + ctx node-instance runtime-properties map.key2 $key2 + ''', + windows_script=''' + ctx node-instance runtime-properties map.key1 %key1% + ctx node-instance runtime-properties map.key2 %key2% + ''') + props = self._run( + executor, workflow_context, + script_path=script_path, + process={ + 'env': { + 'key1': 'value1', + 'key2': 'value2' + } + }) + p_map = props['map'] + assert p_map['key1'] == 'value1' + assert p_map['key2'] == 'value2' + + def test_process_cwd(self, executor, workflow_context, tmpdir): + script_path = self._create_script( + tmpdir, + linux_script='''#! /bin/bash -e + ctx node-instance runtime-properties map.cwd $PWD + ''', + windows_script=''' + ctx node-instance runtime-properties map.cwd %CD% + ''') + tmpdir = str(tmpdir) + props = self._run( + executor, workflow_context, + script_path=script_path, + process={ + 'cwd': tmpdir + }) + p_map = props['map'] + assert p_map['cwd'] == tmpdir + + def test_process_command_prefix(self, executor, workflow_context, tmpdir): + use_ctx = 'ctx node-instance runtime-properties map.key value' + python_script = ['import subprocess', + 'subprocess.Popen("{0}".split(' ')).communicate()[0]'.format(use_ctx)] + python_script = '\n'.join(python_script) + script_path = self._create_script( + tmpdir, + linux_script=python_script, + windows_script=python_script, + windows_suffix='', + linux_suffix='') + props = self._run( + executor, workflow_context, + script_path=script_path, + process={ + 'env': {'TEST_KEY': 'value'}, + 'command_prefix': 'python' + }) + p_map = props['map'] + assert p_map['key'] == 'value' + + def test_process_args(self, executor, workflow_context, tmpdir): + script_path = self._create_script( + tmpdir, + linux_script='''#! /bin/bash -e + ctx node-instance runtime-properties map.arg1 "$1" + ctx node-instance runtime-properties map.arg2 $2 + ''', + windows_script=''' + ctx node-instance runtime-properties map.arg1 %1 + ctx node-instance runtime-properties map.arg2 %2 + ''') + props = self._run( + executor, workflow_context, + script_path=script_path, + process={ + 'args': ['"arg with spaces"', 'arg2'] + }) + assert props['map']['arg1'] == 'arg with spaces' + assert props['map']['arg2'] == 'arg2' + + def test_no_script_path(self, executor, workflow_context): + exception = self._run_and_get_task_exception( + executor, workflow_context, + script_path=None) + assert isinstance(exception, TaskAbortException) + assert 'script_path' in exception.message + + def test_script_error(self, executor, workflow_context, tmpdir): + script_path = self._create_script( + tmpdir, + linux_script='''#! /bin/bash -e + echo 123123 + command_that_does_not_exist + ''', + windows_script=''' + @echo off + echo 123123 + command_that_does_not_exist + ''') + exception = self._run_and_get_task_exception( + executor, workflow_context, + script_path=script_path) + assert isinstance(exception, ProcessException) + assert os.path.basename(script_path) in exception.command + assert exception.exit_code == 1 if IS_WINDOWS else 127 + assert exception.stdout.strip() == '123123' + assert 'command_that_does_not_exist' in exception.stderr + + def test_script_error_from_bad_ctx_request(self, executor, workflow_context, tmpdir): + script_path = self._create_script( + tmpdir, + linux_script='''#! /bin/bash -e + ctx property_that_does_not_exist + ''', + windows_script=''' + ctx property_that_does_not_exist + ''') + exception = self._run_and_get_task_exception( + executor, workflow_context, + script_path=script_path) + assert isinstance(exception, ProcessException) + assert os.path.basename(script_path) in exception.command + assert exception.exit_code == 1 + assert 'RequestError' in exception.stderr + assert 'property_that_does_not_exist' in exception.stderr + + def test_python_script(self, executor, workflow_context, tmpdir): + script = ''' +from aria.orchestrator.execution_plugin import ctx, inputs +if __name__ == '__main__': + ctx.node_instance.runtime_properties['key'] = inputs['key'] +''' + suffix = '.py' + script_path = self._create_script( + tmpdir, + linux_script=script, + windows_script=script, + linux_suffix=suffix, + windows_suffix=suffix) + props = self._run( + executor, workflow_context, + script_path=script_path, + inputs={'key': 'value'}) + assert props['key'] == 'value' + + @pytest.mark.parametrize( + 'value', ['string-value', [1, 2, 3], 999, 3.14, False, + {'complex1': {'complex2': {'key': 'value'}, 'list': [1, 2, 3]}}]) + def test_inputs_as_environment_variables(self, executor, workflow_context, tmpdir, value): + script_path = self._create_script( + tmpdir, + linux_script='''#! /bin/bash -e + ctx node-instance runtime-properties key "${input_as_env_var}" + ''', + windows_script=''' + ctx node-instance runtime-properties key "%input_as_env_var%" + ''') + props = self._run( + executor, workflow_context, + script_path=script_path, + env_var=value) + expected = props['key'] if isinstance(value, basestring) else json.loads(props['key']) + assert expected == value + + @pytest.mark.parametrize('value', ['override', {'key': 'value'}]) + def test_explicit_env_variables_inputs_override( + self, executor, workflow_context, tmpdir, value): + script_path = self._create_script( + tmpdir, + linux_script='''#! /bin/bash -e + ctx node-instance runtime-properties key "${input_as_env_var}" + ''', + windows_script=''' + ctx node-instance runtime-properties key "%input_as_env_var%" + ''') + + props = self._run( + executor, workflow_context, + script_path=script_path, + env_var='test-value', + process={ + 'env': { + 'input_as_env_var': value + } + }) + expected = props['key'] if isinstance(value, basestring) else json.loads(props['key']) + assert expected == value + + def test_get_nonexistent_runtime_property(self, executor, workflow_context, tmpdir): + script_path = self._create_script( + tmpdir, + linux_script='''#! /bin/bash -e + ctx node-instance runtime-properties nonexistent + ''', + windows_script=''' + ctx node-instance runtime-properties nonexistent + ''') + exception = self._run_and_get_task_exception( + executor, workflow_context, + script_path=script_path) + assert isinstance(exception, ProcessException) + assert os.path.basename(script_path) in exception.command + assert 'RequestError' in exception.stderr + assert 'nonexistent' in exception.stderr + + def test_get_nonexistent_runtime_property_json(self, executor, workflow_context, tmpdir): + script_path = self._create_script( + tmpdir, + linux_script='''#! /bin/bash -e + ctx -j instance runtime-properties nonexistent + ''', + windows_script=''' + ctx -j instance runtime-properties nonexistent + ''') + exception = self._run_and_get_task_exception( + executor, workflow_context, + script_path=script_path) + assert isinstance(exception, ProcessException) + assert os.path.basename(script_path) in exception.command + assert 'RequestError' in exception.stderr + assert 'nonexistent' in exception.stderr + + def test_abort(self, executor, workflow_context, tmpdir): + script_path = self._create_script( + tmpdir, + linux_script='''#! /bin/bash -e + ctx task abort abort-message + ''', + windows_script=''' + ctx task abort abort-message + ''') + exception = self._run_and_get_task_exception( + executor, workflow_context, + script_path=script_path) + assert isinstance(exception, TaskAbortException) + assert exception.message == 'abort-message' + + def test_retry(self, executor, workflow_context, tmpdir): + script_path = self._create_script( + tmpdir, + linux_script='''#! /bin/bash -e + ctx task retry retry-message + ''', + windows_script=''' + ctx task retry retry-message + ''') + exception = self._run_and_get_task_exception( + executor, workflow_context, + script_path=script_path) + assert isinstance(exception, TaskRetryException) + assert exception.message == 'retry-message' + + def test_retry_with_interval(self, executor, workflow_context, tmpdir): + script_path = self._create_script( + tmpdir, + linux_script='''#! /bin/bash -e + ctx task retry retry-message @100 + ''', + windows_script=''' + ctx task retry retry-message @100 + ''') + exception = self._run_and_get_task_exception( + executor, workflow_context, + script_path=script_path) + assert isinstance(exception, TaskRetryException) + assert exception.message == 'retry-message' + assert exception.retry_interval == 100 + + def test_crash_abort_after_retry(self, executor, workflow_context, tmpdir): + script_path = self._create_script( + tmpdir, + linux_script='''#! /bin/bash + ctx task retry retry-message + ctx task abort should-raise-a-runtime-error + ''', + windows_script=''' + ctx task retry retry-message + ctx task abort should-raise-a-runtime-error + ''') + exception = self._run_and_get_task_exception( + executor, workflow_context, + script_path=script_path) + assert isinstance(exception, TaskAbortException) + assert exception.message == constants.ILLEGAL_CTX_OPERATION_MESSAGE + + def test_crash_retry_after_abort(self, executor, workflow_context, tmpdir): + script_path = self._create_script( + tmpdir, + linux_script='''#! /bin/bash + ctx task abort abort-message + ctx task retry should-raise-a-runtime-error + ''', + windows_script=''' + ctx task abort abort-message + ctx task retry should-raise-a-runtime-error + ''') + exception = self._run_and_get_task_exception( + executor, workflow_context, + script_path=script_path) + assert isinstance(exception, TaskAbortException) + assert exception.message == constants.ILLEGAL_CTX_OPERATION_MESSAGE + + def test_crash_abort_after_abort(self, executor, workflow_context, tmpdir): + script_path = self._create_script( + tmpdir, + linux_script='''#! /bin/bash + ctx task abort abort-message + ctx task abort should-raise-a-runtime-error + ''', + windows_script=''' + ctx task abort abort-message + ctx task abort should-raise-a-runtime-error + ''') + exception = self._run_and_get_task_exception( + executor, workflow_context, + script_path=script_path) + assert isinstance(exception, TaskAbortException) + assert exception.message == constants.ILLEGAL_CTX_OPERATION_MESSAGE + + def test_crash_retry_after_retry(self, executor, workflow_context, tmpdir): + script_path = self._create_script( + tmpdir, + linux_script='''#! /bin/bash + ctx task retry retry-message + ctx task retry should-raise-a-runtime-error + ''', + windows_script=''' + ctx task retry retry-message + ctx task retry should-raise-a-runtime-error + ''') + exception = self._run_and_get_task_exception( + executor, workflow_context, + script_path=script_path) + assert isinstance(exception, TaskAbortException) + assert exception.message == constants.ILLEGAL_CTX_OPERATION_MESSAGE + + def test_retry_returns_a_nonzero_exit_code(self, executor, workflow_context, tmpdir): + log_path = tmpdir.join('temp.log') + message = 'message' + script_path = self._create_script( + tmpdir, + linux_script='''#! /bin/bash -e + ctx task retry '{0}' 2> {1} + echo should-not-run > {1} + '''.format(message, log_path), + windows_script=''' + ctx task retry "{0}" 2> {1} + if %errorlevel% neq 0 exit /b %errorlevel% + echo should-not-run > {1} + '''.format(message, log_path)) + with pytest.raises(ExecutorException): + self._run( + executor, workflow_context, + script_path=script_path) + assert log_path.read().strip() == message + + def test_abort_returns_a_nonzero_exit_code(self, executor, workflow_context, tmpdir): + log_path = tmpdir.join('temp.log') + message = 'message' + script_path = self._create_script( + tmpdir, + linux_script='''#! /bin/bash -e + ctx task abort '{0}' 2> {1} + echo should-not-run > {1} + '''.format(message, log_path), + windows_script=''' + ctx task abort "{0}" 2> {1} + if %errorlevel% neq 0 exit /b %errorlevel% + echo should-not-run > {1} + '''.format(message, log_path)) + with pytest.raises(ExecutorException): + self._run( + executor, workflow_context, + script_path=script_path) + assert log_path.read().strip() == message + + def _create_script(self, + tmpdir, + linux_script, + windows_script, + windows_suffix='.bat', + linux_suffix=''): + suffix = windows_suffix if IS_WINDOWS else linux_suffix + script = windows_script if IS_WINDOWS else linux_script + script_path = tmpdir.join('script{0}'.format(suffix)) + script_path.write(script) + return str(script_path) + + def _run_and_get_task_exception(self, *args, **kwargs): + signal = events.on_failure_task_signal + with events_collector(signal) as collected: + with pytest.raises(ExecutorException): + self._run(*args, **kwargs) + return collected[signal][0]['kwargs']['exception'] + + def _run(self, + executor, + workflow_context, + script_path, + process=None, + env_var='value', + inputs=None): + local_script_path = script_path + script_path = os.path.basename(local_script_path) if local_script_path else None + if script_path: + workflow_context.resource.deployment.upload( + entry_id=str(workflow_context.deployment.id), + source=local_script_path, + path=script_path) + + inputs = inputs or {} + inputs.update({ + 'script_path': script_path, + 'process': process, + 'input_as_env_var': env_var + }) + + @workflow + def mock_workflow(ctx, graph): + op = 'test.op' + node_instance = ctx.model.node_instance.get_by_name( + mock.models.DEPENDENCY_NODE_INSTANCE_NAME) + node_instance.node.operations[op] = { + 'operation': '{0}.{1}'.format(operations.__name__, + operations.run_script_locally.__name__)} + graph.add_tasks(api.task.OperationTask.node_instance( + instance=node_instance, + name=op, + inputs=inputs)) + return graph + tasks_graph = mock_workflow(ctx=workflow_context) # pylint: disable=no-value-for-parameter + eng = engine.Engine( + executor=executor, + workflow_context=workflow_context, + tasks_graph=tasks_graph) + eng.execute() + return workflow_context.model.node_instance.get_by_name( + mock.models.DEPENDENCY_NODE_INSTANCE_NAME).runtime_properties + + @pytest.fixture + def executor(self): + result = process.ProcessExecutor() + yield result + result.close() + + @pytest.fixture + def workflow_context(self, tmpdir): + workflow_context = mock.context.simple( + storage.get_sqlite_api_kwargs(str(tmpdir)), + resources_dir=str(tmpdir.join('resources'))) + workflow_context.states = [] + workflow_context.exception = None + yield workflow_context + storage.release_sqlite_storage(workflow_context.model) + + +class BaseTestConfiguration(object): + + @pytest.fixture(autouse=True) + def mock_execute(self, mocker): + def eval_func(**_): + self.called = 'eval' + + def execute_func(process, **_): + self.process = process + self.called = 'execute' + self.process = {} + self.called = None + mocker.patch.object(local, '_execute_func', execute_func) + mocker.patch.object(local, '_eval_script_func', eval_func) + + class Ctx(object): + @staticmethod + def download_resource(destination, *args, **kwargs): + return destination + + def _run(self, script_path, process=None): + local.run_script( + script_path=script_path, + process=process, + ctx=self.Ctx) + + +class TestPowerShellConfiguration(BaseTestConfiguration): + + def test_implicit_powershell_call_with_ps1_extension(self): + self._run(script_path='script_path.ps1') + assert self.process['command_prefix'] == 'powershell' + + def test_command_prefix_is_overridden_for_ps1_extension(self): + self._run(script_path='script_path.ps1', + process={'command_prefix': 'bash'}) + assert self.process['command_prefix'] == 'bash' + + def test_explicit_powershell_call(self): + self._run(script_path='script_path.ps1', + process={'command_prefix': 'powershell'}) + assert self.process['command_prefix'] == 'powershell' + + +class TestEvalPythonConfiguration(BaseTestConfiguration): + + def test_explicit_eval_without_py_extension(self): + self._run(script_path='script_path', + process={'eval_python': True}) + assert self.called == 'eval' + + def test_explicit_eval_with_py_extension(self): + self._run(script_path='script_path.py', + process={'eval_python': True}) + assert self.called == 'eval' + + def test_implicit_eval(self): + self._run(script_path='script_path.py') + assert self.called == 'eval' + + def test_explicit_execute_without_py_extension(self): + self._run(script_path='script_path', + process={'eval_python': False}) + assert self.called == 'execute' + + def test_explicit_execute_with_py_extension(self): + self._run(script_path='script_path.py', + process={'eval_python': False}) + assert self.called == 'execute' + + def test_implicit_execute(self): + self._run(script_path='script_path') + assert self.called == 'execute' http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/433d55ea/tests/orchestrator/execution_plugin/test_ssh.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_ssh.py b/tests/orchestrator/execution_plugin/test_ssh.py new file mode 100644 index 0000000..6b5c783 --- /dev/null +++ b/tests/orchestrator/execution_plugin/test_ssh.py @@ -0,0 +1,481 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import contextlib +import json +import logging +import os + +import pytest + +import fabric.api +from fabric.contrib import files +from fabric import context_managers + +from aria.storage import model +from aria.orchestrator import events +from aria.orchestrator import workflow +from aria.orchestrator.workflows import api +from aria.orchestrator.workflows.executor import process +from aria.orchestrator.workflows.core import engine +from aria.orchestrator.workflows.exceptions import ExecutorException +from aria.orchestrator.exceptions import TaskAbortException, TaskRetryException +from aria.orchestrator.execution_plugin import operations +from aria.orchestrator.execution_plugin import constants +from aria.orchestrator.execution_plugin.exceptions import ProcessException, TaskException +from aria.orchestrator.execution_plugin.ssh import operations as ssh_operations + +from tests import mock, storage, resources +from tests.orchestrator.workflows.helpers import events_collector + +_CUSTOM_BASE_DIR = '/tmp/new-aria-ctx' + +_FABRIC_ENV = { + 'host_string': 'localhost', + 'user': 'travis', + 'password': 'travis' +} + + +@pytest.mark.skipif(not os.environ.get('TRAVIS'), reason='actual ssh server required') +class TestWithActualSSHServer(object): + + def test_run_script_basic(self): + expected_runtime_property_value = 'some_value' + props = self._execute(env={'test_value': expected_runtime_property_value}) + assert props['test_value'] == expected_runtime_property_value + + @pytest.mark.skip(reason='sudo privileges are required') + def test_run_script_as_sudo(self): + self._execute(use_sudo=True) + with self._ssh_env(): + assert files.exists('/opt/test_dir') + fabric.api.sudo('rm -rf /opt/test_dir') + + def test_run_script_default_base_dir(self): + props = self._execute() + assert props['work_dir'] == '{0}/work'.format(constants.DEFAULT_BASE_DIR) + + @pytest.mark.skip(reason='Re-enable once output from process executor can be captured') + @pytest.mark.parametrize('hide_groups', [[], ['everything']]) + def test_run_script_with_hide(self, hide_groups): + self._execute(hide_output=hide_groups) + output = 'TODO' + expected_log_message = ('[localhost] run: source {0}/scripts/' + .format(constants.DEFAULT_BASE_DIR)) + if hide_groups: + assert expected_log_message not in output + else: + assert expected_log_message in output + + def test_run_script_process_config(self): + expected_env_value = 'test_value_env' + expected_arg1_value = 'test_value_arg1' + expected_arg2_value = 'test_value_arg2' + expected_cwd = '/tmp' + expected_base_dir = _CUSTOM_BASE_DIR + props = self._execute( + env={'test_value_env': expected_env_value}, + process={ + 'args': [expected_arg1_value, expected_arg2_value], + 'cwd': expected_cwd, + 'base_dir': expected_base_dir + }) + assert props['env_value'] == expected_env_value + assert len(props['bash_version']) > 0 + assert props['arg1_value'] == expected_arg1_value + assert props['arg2_value'] == expected_arg2_value + assert props['cwd'] == expected_cwd + assert props['ctx_path'] == '{0}/ctx'.format(expected_base_dir) + + def test_run_script_command_prefix(self): + props = self._execute(process={'command_prefix': 'bash -i'}) + assert 'i' in props['dollar_dash'] + + def test_run_script_reuse_existing_ctx(self): + expected_test_value_1 = 'test_value_1' + expected_test_value_2 = 'test_value_2' + props = self._execute( + test_operations=['{0}_1'.format(self.test_name), + '{0}_2'.format(self.test_name)], + env={'test_value1': expected_test_value_1, + 'test_value2': expected_test_value_2}) + assert props['test_value1'] == expected_test_value_1 + assert props['test_value2'] == expected_test_value_2 + + def test_run_script_download_resource_plain(self, tmpdir): + resource = tmpdir.join('resource') + resource.write('content') + self._upload(str(resource), 'test_resource') + props = self._execute() + assert props['test_value'] == 'content' + + def test_run_script_download_resource_and_render(self, tmpdir): + resource = tmpdir.join('resource') + resource.write('{{ctx.deployment.name}}') + self._upload(str(resource), 'test_resource') + props = self._execute() + assert props['test_value'] == self._workflow_context.deployment.name + + @pytest.mark.parametrize('value', ['string-value', [1, 2, 3], {'key': 'value'}]) + def test_run_script_inputs_as_env_variables_no_override(self, value): + props = self._execute(custom_input=value) + return_value = props['test_value'] + expected = return_value if isinstance(value, basestring) else json.loads(return_value) + assert value == expected + + @pytest.mark.parametrize('value', ['string-value', [1, 2, 3], {'key': 'value'}]) + def test_run_script_inputs_as_env_variables_process_env_override(self, value): + props = self._execute(custom_input='custom-input-value', + env={'custom_env_var': value}) + return_value = props['test_value'] + expected = return_value if isinstance(value, basestring) else json.loads(return_value) + assert value == expected + + def test_run_script_error_in_script(self): + exception = self._execute_and_get_task_exception() + assert isinstance(exception, TaskException) + + def test_run_script_abort_immediate(self): + exception = self._execute_and_get_task_exception() + assert isinstance(exception, TaskAbortException) + assert exception.message == 'abort-message' + + def test_run_script_retry(self): + exception = self._execute_and_get_task_exception() + assert isinstance(exception, TaskRetryException) + assert exception.message == 'retry-message' + + def test_run_script_abort_error_ignored_by_script(self): + exception = self._execute_and_get_task_exception() + assert isinstance(exception, TaskAbortException) + assert exception.message == 'abort-message' + + def test_run_commands(self): + temp_file_path = '/tmp/very_temporary_file' + with self._ssh_env(): + if files.exists(temp_file_path): + fabric.api.run('rm {0}'.format(temp_file_path)) + self._execute(commands=['touch {0}'.format(temp_file_path)]) + with self._ssh_env(): + assert files.exists(temp_file_path) + fabric.api.run('rm {0}'.format(temp_file_path)) + + @pytest.fixture(autouse=True) + def _setup(self, request, workflow_context, executor, capfd): + self._workflow_context = workflow_context + self._executor = executor + self._capfd = capfd + self.test_name = request.node.originalname or request.node.name + with self._ssh_env(): + for directory in [constants.DEFAULT_BASE_DIR, _CUSTOM_BASE_DIR]: + if files.exists(directory): + fabric.api.run('rm -rf {0}'.format(directory)) + + @contextlib.contextmanager + def _ssh_env(self): + with self._capfd.disabled(): + with context_managers.settings(fabric.api.hide('everything'), + **_FABRIC_ENV): + yield + + def _execute(self, + env=None, + use_sudo=False, + hide_output=None, + process=None, + custom_input='', + test_operations=None, + commands=None): + process = process or {} + if env: + process.setdefault('env', {}).update(env) + + test_operations = test_operations or [self.test_name] + + local_script_path = os.path.join(resources.DIR, 'scripts', 'test_ssh.sh') + script_path = os.path.basename(local_script_path) + self._upload(local_script_path, script_path) + + if commands: + operation = operations.run_commands_with_ssh + else: + operation = operations.run_script_with_ssh + + @workflow + def mock_workflow(ctx, graph): + op = 'test.op' + node_instance = ctx.model.node_instance.get_by_name( + mock.models.DEPENDENCY_NODE_INSTANCE_NAME) + node_instance.node.operations[op] = { + 'operation': '{0}.{1}'.format(operations.__name__, operation.__name__)} + graph.sequence(*[api.task.OperationTask.node_instance( + instance=node_instance, + name=op, + inputs={ + 'script_path': script_path, + 'fabric_env': _FABRIC_ENV, + 'process': process, + 'use_sudo': use_sudo, + 'hide_output': hide_output, + 'custom_env_var': custom_input, + 'test_operation': test_operation, + 'commands': commands + }) for test_operation in test_operations]) + return graph + tasks_graph = mock_workflow(ctx=self._workflow_context) # pylint: disable=no-value-for-parameter + eng = engine.Engine( + executor=self._executor, + workflow_context=self._workflow_context, + tasks_graph=tasks_graph) + eng.execute() + return self._workflow_context.model.node_instance.get_by_name( + mock.models.DEPENDENCY_NODE_INSTANCE_NAME).runtime_properties + + def _execute_and_get_task_exception(self, *args, **kwargs): + signal = events.on_failure_task_signal + with events_collector(signal) as collected: + with pytest.raises(ExecutorException): + self._execute(*args, **kwargs) + return collected[signal][0]['kwargs']['exception'] + + def _upload(self, source, path): + self._workflow_context.resource.deployment.upload( + entry_id=str(self._workflow_context.deployment.id), + source=source, + path=path) + + @pytest.fixture + def executor(self): + result = process.ProcessExecutor() + yield result + result.close() + + @pytest.fixture + def workflow_context(self, tmpdir): + workflow_context = mock.context.simple( + storage.get_sqlite_api_kwargs(str(tmpdir)), + resources_dir=str(tmpdir.join('resources'))) + workflow_context.states = [] + workflow_context.exception = None + yield workflow_context + storage.release_sqlite_storage(workflow_context.model) + + +class TestFabricEnvHideGroupsAndRunCommands(object): + + def test_fabric_env_default_override(self): + # first sanity for no override + self._run() + assert self.mock.settings_merged['timeout'] == constants.FABRIC_ENV_DEFAULTS['timeout'] + # now override + invocation_fabric_env = self.default_fabric_env.copy() + timeout = 1000000 + invocation_fabric_env['timeout'] = timeout + self._run(fabric_env=invocation_fabric_env) + assert self.mock.settings_merged['timeout'] == timeout + + def test_implicit_host_string(self, mocker): + expected_ip = '1.1.1.1' + mocker.patch.object(self._Ctx.task.runs_on, 'ip', expected_ip) + fabric_env = self.default_fabric_env.copy() + del fabric_env['host_string'] + self._run(fabric_env=fabric_env) + assert self.mock.settings_merged['host_string'] == expected_ip + + def test_explicit_host_string(self): + fabric_env = self.default_fabric_env.copy() + host_string = 'explicit_host_string' + fabric_env['host_string'] = host_string + self._run(fabric_env=fabric_env) + assert self.mock.settings_merged['host_string'] == host_string + + def test_override_warn_only(self): + fabric_env = self.default_fabric_env.copy() + self._run(fabric_env=fabric_env) + assert self.mock.settings_merged['warn_only'] is True + fabric_env = self.default_fabric_env.copy() + fabric_env['warn_only'] = False + self._run(fabric_env=fabric_env) + assert self.mock.settings_merged['warn_only'] is False + + def test_missing_host_string(self): + with pytest.raises(TaskAbortException) as exc_ctx: + fabric_env = self.default_fabric_env.copy() + del fabric_env['host_string'] + self._run(fabric_env=fabric_env) + assert '`host_string` not supplied' in str(exc_ctx.value) + + def test_missing_user(self): + with pytest.raises(TaskAbortException) as exc_ctx: + fabric_env = self.default_fabric_env.copy() + del fabric_env['user'] + self._run(fabric_env=fabric_env) + assert '`user` not supplied' in str(exc_ctx.value) + + def test_missing_key_or_password(self): + with pytest.raises(TaskAbortException) as exc_ctx: + fabric_env = self.default_fabric_env.copy() + del fabric_env['key_filename'] + self._run(fabric_env=fabric_env) + assert 'Access credentials not supplied' in str(exc_ctx.value) + + def test_hide_in_settings_and_non_viable_groups(self): + groups = ('running', 'stdout') + self._run(hide_output=groups) + assert set(self.mock.settings_merged['hide_output']) == set(groups) + with pytest.raises(TaskAbortException) as exc_ctx: + self._run(hide_output=('running', 'bla')) + assert '`hide_output` must be a subset of' in str(exc_ctx.value) + + def test_run_commands(self): + def test(use_sudo): + commands = ['command1', 'command2'] + self._run( + commands=commands, + use_sudo=use_sudo) + assert all(item in self.mock.settings_merged.items() for + item in self.default_fabric_env.items()) + assert self.mock.settings_merged['warn_only'] is True + assert self.mock.settings_merged['use_sudo'] == use_sudo + assert self.mock.commands == commands + self.mock.settings_merged = {} + self.mock.commands = [] + test(use_sudo=False) + test(use_sudo=True) + + def test_failed_command(self): + with pytest.raises(ProcessException) as exc_ctx: + self._run(commands=['fail']) + exception = exc_ctx.value + assert exception.stdout == self.MockCommandResult.stdout + assert exception.stderr == self.MockCommandResult.stderr + assert exception.command == self.MockCommandResult.command + assert exception.exit_code == self.MockCommandResult.return_code + + class MockCommandResult(object): + stdout = 'mock_stdout' + stderr = 'mock_stderr' + command = 'mock_command' + return_code = 1 + + def __init__(self, failed): + self.failed = failed + + class MockFabricApi(object): + + def __init__(self): + self.commands = [] + self.settings_merged = {} + + @contextlib.contextmanager + def settings(self, *args, **kwargs): + self.settings_merged.update(kwargs) + if args: + groups = args[0] + self.settings_merged.update({'hide_output': groups}) + yield + + def run(self, command): + self.commands.append(command) + self.settings_merged['use_sudo'] = False + return TestFabricEnvHideGroupsAndRunCommands.MockCommandResult(command == 'fail') + + def sudo(self, command): + self.commands.append(command) + self.settings_merged['use_sudo'] = True + return TestFabricEnvHideGroupsAndRunCommands.MockCommandResult(command == 'fail') + + def hide(self, *groups): + return groups + + def exists(self, *args, **kwargs): + raise RuntimeError + + class _Ctx(object): + class Stub(object): + @staticmethod + def abort(message=None): + model.Task.abort(message) + ip = None + task = Stub + task.runs_on = Stub + logger = logging.getLogger() + + @pytest.fixture(autouse=True) + def _setup(self, mocker): + self.default_fabric_env = { + 'host_string': 'test', + 'user': 'test', + 'key_filename': 'test', + } + self.mock = self.MockFabricApi() + mocker.patch('fabric.api', self.mock) + + def _run(self, + commands=(), + fabric_env=None, + process=None, + use_sudo=False, + hide_output=None): + operations.run_commands_with_ssh( + ctx=self._Ctx, + commands=commands, + process=process, + fabric_env=fabric_env or self.default_fabric_env, + use_sudo=use_sudo, + hide_output=hide_output) + + +class TestUtilityFunctions(object): + + def test_paths(self): + base_dir = '/path' + local_script_path = '/local/script/path.py' + paths = ssh_operations._Paths(base_dir=base_dir, + local_script_path=local_script_path) + assert paths.local_script_path == local_script_path + assert paths.remote_ctx_dir == base_dir + assert paths.base_script_path == 'path.py' + assert paths.remote_ctx_path == '/path/ctx' + assert paths.remote_scripts_dir == '/path/scripts' + assert paths.remote_work_dir == '/path/work' + assert paths.remote_env_script_path.startswith('/path/scripts/env-path.py-') + assert paths.remote_script_path.startswith('/path/scripts/path.py-') + + def test_write_environment_script_file(self): + base_dir = '/path' + local_script_path = '/local/script/path.py' + paths = ssh_operations._Paths(base_dir=base_dir, + local_script_path=local_script_path) + env = {'one': "'1'"} + local_socket_url = 'local_socket_url' + remote_socket_url = 'remote_socket_url' + env_script_lines = set([l for l in ssh_operations._write_environment_script_file( + process={'env': env}, + paths=paths, + local_socket_url=local_socket_url, + remote_socket_url=remote_socket_url + ).getvalue().split('\n') if l]) + expected_env_script_lines = set([ + 'export PATH=/path:$PATH', + 'export PYTHONPATH=/path:$PYTHONPATH', + 'chmod +x /path/ctx', + 'chmod +x {0}'.format(paths.remote_script_path), + 'export CTX_SOCKET_URL={0}'.format(remote_socket_url), + 'export LOCAL_CTX_SOCKET_URL={0}'.format(local_socket_url), + 'export one=\'1\'' + ]) + assert env_script_lines == expected_env_script_lines http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/433d55ea/tests/orchestrator/workflows/api/test_task.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/api/test_task.py b/tests/orchestrator/workflows/api/test_task.py index 58e387f..601c437 100644 --- a/tests/orchestrator/workflows/api/test_task.py +++ b/tests/orchestrator/workflows/api/test_task.py @@ -16,6 +16,7 @@ import pytest +from aria.storage import model from aria.orchestrator import context from aria.orchestrator.workflows import api @@ -72,6 +73,7 @@ class TestOperationTask(object): assert api_task.plugin == {'name': 'plugin', 'package_name': 'package', 'package_version': '0.1'} + assert api_task.runs_on == model.Task.RUNS_ON_NODE_INSTANCE def test_source_relationship_operation_task_creation(self, ctx): operation_name = 'aria.interfaces.relationship_lifecycle.preconfigure' @@ -104,6 +106,7 @@ class TestOperationTask(object): assert api_task.plugin == {'name': 'plugin', 'package_name': 'package', 'package_version': '0.1'} + assert api_task.runs_on == model.Task.RUNS_ON_SOURCE def test_target_relationship_operation_task_creation(self, ctx): operation_name = 'aria.interfaces.relationship_lifecycle.preconfigure' @@ -136,6 +139,7 @@ class TestOperationTask(object): assert api_task.plugin == {'name': 'plugin', 'package_name': 'package', 'package_version': '0.1'} + assert api_task.runs_on == model.Task.RUNS_ON_TARGET def test_operation_task_default_values(self, ctx): dependency_node_instance = ctx.model.node_instance.get_by_name( @@ -151,6 +155,7 @@ class TestOperationTask(object): assert task.max_attempts == ctx._task_max_attempts assert task.ignore_failure == ctx._task_ignore_failure assert task.plugin == {} + assert task.runs_on is None class TestWorkflowTask(object): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/433d55ea/tests/orchestrator/workflows/core/test_task.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_task.py b/tests/orchestrator/workflows/core/test_task.py index 5381f5d..020de32 100644 --- a/tests/orchestrator/workflows/core/test_task.py +++ b/tests/orchestrator/workflows/core/test_task.py @@ -38,7 +38,7 @@ def ctx(tmpdir): class TestOperationTask(object): - def _create_operation_task(self, ctx, node_instance): + def _create_node_operation_task(self, ctx, node_instance): with workflow_context.current.push(ctx): api_task = api.task.OperationTask.node_instance( instance=node_instance, @@ -46,21 +46,31 @@ class TestOperationTask(object): core_task = core.task.OperationTask(api_task=api_task) return api_task, core_task - def test_operation_task_creation(self, ctx): + def _create_relationship_operation_task(self, ctx, relationship_instance, operation_end): + with workflow_context.current.push(ctx): + api_task = api.task.OperationTask.relationship_instance( + instance=relationship_instance, + name='aria.interfaces.relationship_lifecycle.preconfigure', + operation_end=operation_end) + core_task = core.task.OperationTask(api_task=api_task) + return api_task, core_task + + def test_node_operation_task_creation(self, ctx): storage_plugin = mock.models.get_plugin(package_name='p1', package_version='0.1') storage_plugin_other = mock.models.get_plugin(package_name='p0', package_version='0.0') ctx.model.plugin.put(storage_plugin_other) ctx.model.plugin.put(storage_plugin) - node_instance = \ - ctx.model.node_instance.get_by_name(mock.models.DEPENDENCY_NODE_INSTANCE_NAME) + node_instance = ctx.model.node_instance.get_by_name( + mock.models.DEPENDENCY_NODE_INSTANCE_NAME) node = node_instance.node node.plugins = [{'name': 'plugin1', 'package_name': 'p1', 'package_version': '0.1'}] node.operations['aria.interfaces.lifecycle.create'] = {'plugin': 'plugin1'} - api_task, core_task = self._create_operation_task(ctx, node_instance) + api_task, core_task = self._create_node_operation_task(ctx, node_instance) storage_task = ctx.model.task.get_by_name(core_task.name) assert storage_task.execution_name == ctx.execution.name + assert storage_task.runs_on.id == core_task.context.node_instance.id assert core_task.model_task == storage_task assert core_task.name == api_task.name assert core_task.operation_mapping == api_task.operation_mapping @@ -68,11 +78,25 @@ class TestOperationTask(object): assert core_task.inputs == api_task.inputs == storage_task.inputs assert core_task.plugin == storage_plugin + def test_source_relationship_operation_task_creation(self, ctx): + relationship_instance = ctx.model.relationship_instance.list()[0] + _, core_task = self._create_relationship_operation_task( + ctx, relationship_instance, + api.task.OperationTask.SOURCE_OPERATION) + assert core_task.model_task.runs_on.id == relationship_instance.source_node_instance.id + + def test_target_relationship_operation_task_creation(self, ctx): + relationship_instance = ctx.model.relationship_instance.list()[0] + _, core_task = self._create_relationship_operation_task( + ctx, relationship_instance, + api.task.OperationTask.TARGET_OPERATION) + assert core_task.model_task.runs_on.id == relationship_instance.target_node_instance.id + def test_operation_task_edit_locked_attribute(self, ctx): node_instance = \ ctx.model.node_instance.get_by_name(mock.models.DEPENDENCY_NODE_INSTANCE_NAME) - _, core_task = self._create_operation_task(ctx, node_instance) + _, core_task = self._create_node_operation_task(ctx, node_instance) now = datetime.utcnow() with pytest.raises(exceptions.TaskException): core_task.status = core_task.STARTED @@ -89,7 +113,7 @@ class TestOperationTask(object): node_instance = \ ctx.model.node_instance.get_by_name(mock.models.DEPENDENCY_NODE_INSTANCE_NAME) - _, core_task = self._create_operation_task(ctx, node_instance) + _, core_task = self._create_node_operation_task(ctx, node_instance) future_time = datetime.utcnow() + timedelta(seconds=3) with core_task._update(): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/433d55ea/tests/orchestrator/workflows/helpers.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/helpers.py b/tests/orchestrator/workflows/helpers.py new file mode 100644 index 0000000..8e3f9b1 --- /dev/null +++ b/tests/orchestrator/workflows/helpers.py @@ -0,0 +1,37 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from contextlib import contextmanager + + +@contextmanager +def events_collector(*signals): + handlers = {} + collected = {} + + def handler_factory(key): + def handler(*args, **kwargs): + signal_events = collected.setdefault(key, []) + signal_events.append({'args': args, 'kwargs': kwargs}) + handlers[signal] = handler + return handler + + for signal in signals: + signal.connect(handler_factory(signal)) + try: + yield collected + finally: + for signal in signals: + signal.disconnect(handlers[signal]) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/433d55ea/tests/resources/scripts/test_ssh.sh ---------------------------------------------------------------------- diff --git a/tests/resources/scripts/test_ssh.sh b/tests/resources/scripts/test_ssh.sh new file mode 100644 index 0000000..6f18278 --- /dev/null +++ b/tests/resources/scripts/test_ssh.sh @@ -0,0 +1,81 @@ +#!/bin/bash + +set -u +set -e + +test_run_script_basic() { + ctx node-instance runtime-properties test_value $test_value +} + +test_run_script_as_sudo() { + mkdir -p /opt/test_dir +} + +test_run_script_default_base_dir() { + ctx node-instance runtime-properties work_dir $PWD +} + +test_run_script_with_hide() { + true +} + +test_run_script_process_config() { + ctx node-instance runtime-properties env_value $test_value_env + ctx node-instance runtime-properties bash_version $BASH_VERSION + ctx node-instance runtime-properties arg1_value $1 + ctx node-instance runtime-properties arg2_value $2 + ctx node-instance runtime-properties cwd $PWD + ctx node-instance runtime-properties ctx_path $(which ctx) +} + +test_run_script_command_prefix() { + ctx node-instance runtime-properties dollar_dash $- +} + +test_run_script_reuse_existing_ctx_1() { + ctx node-instance runtime-properties test_value1 $test_value1 +} + +test_run_script_reuse_existing_ctx_2() { + ctx node-instance runtime-properties test_value2 $test_value2 +} + +test_run_script_download_resource_plain() { + local destination=$(mktemp) + ctx download-resource ${destination} test_resource + ctx node-instance runtime-properties test_value "$(cat ${destination})" +} + +test_run_script_download_resource_and_render() { + local destination=$(mktemp) + ctx download-resource-and-render ${destination} test_resource + ctx node-instance runtime-properties test_value "$(cat ${destination})" +} + +test_run_script_inputs_as_env_variables_no_override() { + ctx node-instance runtime-properties test_value "$custom_env_var" +} + +test_run_script_inputs_as_env_variables_process_env_override() { + ctx node-instance runtime-properties test_value "$custom_env_var" +} + +test_run_script_error_in_script() { + ctx property-that-does-not-exist +} + +test_run_script_abort_immediate() { + ctx task abort abort-message +} + +test_run_script_retry() { + ctx task retry retry-message +} + +test_run_script_abort_error_ignored_by_script() { + set +e + ctx task abort abort-message +} + +# Injected by test +${test_operation} $@ http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/433d55ea/tests/test_logger.py ---------------------------------------------------------------------- diff --git a/tests/test_logger.py b/tests/test_logger.py index 0199068..70f08bb 100644 --- a/tests/test_logger.py +++ b/tests/test_logger.py @@ -109,7 +109,7 @@ def test_loggermixin(capsys): test_string = 'loggermixing_test_string' - create_logger(handlers=[create_console_log_handler()]) + logger = create_logger(handlers=[create_console_log_handler()]) custom_class = type('CustomClass', (LoggerMixin,), {}).with_logger() custom_class.logger.debug(test_string) @@ -117,6 +117,9 @@ def test_loggermixin(capsys): _, err = capsys.readouterr() assert test_string in err + for handler in logger.handlers: + logger.removeHandler(handler) + # TODO: figure out what up with pickle # class_pickled = pickle.dumps(custom_class) # class_unpickled = pickle.loads(class_pickled) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/433d55ea/tests/utils/test_exceptions.py ---------------------------------------------------------------------- diff --git a/tests/utils/test_exceptions.py b/tests/utils/test_exceptions.py new file mode 100644 index 0000000..5d030e2 --- /dev/null +++ b/tests/utils/test_exceptions.py @@ -0,0 +1,73 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import jsonpickle + +from aria.utils import exceptions + +_ARG1 = 'arg-1' +_ARG2 = 'arg-2' + + +class TestWrapIfNeeded(object): + + def test_no_wrapping_required1(self): + e = JsonPickleableException1(_ARG1, _ARG2) + assert exceptions.wrap_if_needed(e) is e + + def test_no_wrapping_required2(self): + e = JsonPickleableException1(arg1=_ARG1, arg2=_ARG2) + assert exceptions.wrap_if_needed(e) is e + + def test_no_wrapping_required3(self): + e = JsonPickleableException2(arg1=_ARG1, arg2=_ARG2) + assert exceptions.wrap_if_needed(e) is e + + def test_wrapping_required1(self): + e = NonJsonPickleableException(_ARG1, _ARG2) + wrapped_e = exceptions.wrap_if_needed(e) + wrapped_e = jsonpickle.loads(jsonpickle.dumps(wrapped_e)) + assert isinstance(wrapped_e, exceptions._WrappedException) + assert wrapped_e.exception_type == type(e).__name__ + assert wrapped_e.exception_str == str(e) + + def test_wrapping_required2(self): + e = NonJsonPickleableException(arg1=_ARG1, arg2=_ARG2) + wrapped_e = exceptions.wrap_if_needed(e) + wrapped_e = jsonpickle.loads(jsonpickle.dumps(wrapped_e)) + assert isinstance(wrapped_e, exceptions._WrappedException) + assert wrapped_e.exception_type == type(e).__name__ + assert wrapped_e.exception_str == str(e) + + +class JsonPickleableException1(Exception): + def __init__(self, arg1, arg2): + super(JsonPickleableException1, self).__init__(arg1, arg2) + self.arg1 = arg1 + self.arg2 = arg2 + + +class JsonPickleableException2(Exception): + def __init__(self, arg1=None, arg2=None): + super(JsonPickleableException2, self).__init__() + self.arg1 = arg1 + self.arg2 = arg2 + + +class NonJsonPickleableException(Exception): + def __init__(self, arg1, arg2): + super(NonJsonPickleableException, self).__init__() + self.arg1 = arg1 + self.arg2 = arg2 http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/433d55ea/tox.ini ---------------------------------------------------------------------- diff --git a/tox.ini b/tox.ini index 68f9ffa..fa4bd5c 100644 --- a/tox.ini +++ b/tox.ini @@ -14,6 +14,13 @@ envlist=py27,py26,pywin,pylint_code,pylint_tests [testenv] +passenv = + TRAVIS + PYTHON + PYTHON_VERSION + PYTHON_ARCH +setenv = + INSTALL_CTX=1 deps = -rrequirements.txt -rtests/requirements.txt