Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-46-execution-plugin a48d31c74 -> 433d55ea6 (forced update)


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/433d55ea/tests/orchestrator/execution_plugin/test_ctx_proxy_server.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_ctx_proxy_server.py 
b/tests/orchestrator/execution_plugin/test_ctx_proxy_server.py
new file mode 100644
index 0000000..98ceff9
--- /dev/null
+++ b/tests/orchestrator/execution_plugin/test_ctx_proxy_server.py
@@ -0,0 +1,359 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import time
+import sys
+import subprocess
+import StringIO
+
+import pytest
+
+from aria.orchestrator.execution_plugin import ctx_proxy
+
+
+class TestCtxProxy(object):
+
+    def test_attribute_access(self, server):
+        response = self.request(server, 'stub_attr', 'some_property')
+        assert response == 'some_value'
+
+    def test_sugared_attribute_access(self, server):
+        response = self.request(server, 'stub-attr', 'some-property')
+        assert response == 'some_value'
+
+    def test_dict_prop_access_get_key(self, server):
+        response = self.request(server, 'node', 'properties', 'prop1')
+        assert response == 'value1'
+
+    def test_dict_prop_access_get_key_nested(self, server):
+        response = self.request(server, 'node', 'properties', 
'prop2.nested_prop1')
+        assert response == 'nested_value1'
+
+    def test_dict_prop_access_get_with_list_index(self, server):
+        response = self.request(server, 'node', 'properties', 'prop3[2].value')
+        assert response == 'value_2'
+
+    def test_dict_prop_access_set(self, server, ctx):
+        self.request(server, 'node', 'properties', 'prop4.key', 'new_value')
+        self.request(server, 'node', 'properties', 'prop3[2].value', 
'new_value_2')
+        self.request(server, 'node', 'properties', 'prop4.some.new.path',
+                     'some_new_value')
+        assert ctx.node.properties['prop4']['key'] == 'new_value'
+        assert ctx.node.properties['prop3'][2]['value'] == 'new_value_2'
+        assert ctx.node.properties['prop4']['some']['new']['path'] == 
'some_new_value'
+
+    def test_illegal_dict_access(self, server):
+        self.request(server, 'node', 'properties', 'prop4.key', 'new_value')
+        with pytest.raises(RuntimeError):
+            self.request(server, 'node', 'properties', 'prop4.key', 
'new_value', 'what')
+
+    def test_method_invocation(self, server):
+        args = ['arg1', 'arg2', 'arg3']
+        response_args = self.request(server, 'stub-method', *args)
+        assert response_args == args
+
+    def test_method_invocation_no_args(self, server):
+        response = self.request(server, 'stub-method')
+        assert response == []
+
+    def test_method_invocation_kwargs(self, server):
+        arg1 = 'arg1'
+        arg2 = 'arg2'
+        arg4 = 'arg4_override'
+        arg5 = 'arg5'
+        kwargs = dict(
+            arg4=arg4,
+            arg5=arg5)
+        response = self.request(server, 'stub_args', arg1, arg2, kwargs)
+        assert response == dict(
+            arg1=arg1,
+            arg2=arg2,
+            arg3='arg3',
+            arg4=arg4,
+            args=[],
+            kwargs=dict(
+                arg5=arg5))
+
+    def test_empty_return_value(self, server):
+        response = self.request(server, 'stub_none')
+        assert response is None
+
+    def test_client_request_timeout(self, server):
+        with pytest.raises(IOError):
+            ctx_proxy.client._client_request(server.socket_url,
+                                             args=['stub-sleep', '0.5'],
+                                             timeout=0.1)
+
+    def test_processing_exception(self, server):
+        with pytest.raises(ctx_proxy.client._RequestError):
+            self.request(server, 'property_that_does_not_exist')
+
+    def test_not_json_serializable(self, server):
+        with pytest.raises(ctx_proxy.client._RequestError):
+            self.request(server, 'logger')
+
+    def test_no_string_arg(self, server):
+        args = ['stub_method', 1, 2]
+        response = self.request(server, *args)
+        assert response == args[1:]
+
+    class StubAttribute(object):
+        some_property = 'some_value'
+
+    class NodeAttribute(object):
+        def __init__(self, properties):
+            self.properties = properties
+
+    @staticmethod
+    def stub_method(*args):
+        return args
+
+    @staticmethod
+    def stub_sleep(seconds):
+        time.sleep(float(seconds))
+
+    @staticmethod
+    def stub_args(arg1, arg2, arg3='arg3', arg4='arg4', *args, **kwargs):
+        return dict(
+            arg1=arg1,
+            arg2=arg2,
+            arg3=arg3,
+            arg4=arg4,
+            args=args,
+            kwargs=kwargs)
+
+    @pytest.fixture
+    def ctx(self):
+        class MockCtx(object):
+            pass
+        ctx = MockCtx()
+        properties = {
+            'prop1': 'value1',
+            'prop2': {
+                'nested_prop1': 'nested_value1'
+            },
+            'prop3': [
+                {'index': 0, 'value': 'value_0'},
+                {'index': 1, 'value': 'value_1'},
+                {'index': 2, 'value': 'value_2'}
+            ],
+            'prop4': {
+                'key': 'value'
+            }
+        }
+        ctx.stub_none = None
+        ctx.stub_method = self.stub_method
+        ctx.stub_sleep = self.stub_sleep
+        ctx.stub_args = self.stub_args
+        ctx.stub_attr = self.StubAttribute()
+        ctx.node = self.NodeAttribute(properties)
+        return ctx
+
+    @pytest.fixture
+    def server(self, ctx):
+        result = ctx_proxy.server.CtxProxy(ctx)
+        yield result
+        result.close()
+
+    def request(self, server, *args):
+        return ctx_proxy.client._client_request(server.socket_url, args, 
timeout=5)
+
+
+class TestArgumentParsing(object):
+
+    def test_socket_url_arg(self):
+        self.expected.update(dict(socket_url='sock_url'))
+        ctx_proxy.client.main(['--socket-url', 
self.expected.get('socket_url')])
+
+    def test_socket_url_env(self):
+        expected_socket_url = 'env_sock_url'
+        os.environ['CTX_SOCKET_URL'] = expected_socket_url
+        self.expected.update(dict(socket_url=expected_socket_url))
+        ctx_proxy.client.main([])
+
+    def test_socket_url_missing(self):
+        del os.environ['CTX_SOCKET_URL']
+        with pytest.raises(RuntimeError):
+            ctx_proxy.client.main([])
+
+    def test_args(self):
+        self.expected.update(dict(args=['1', '2', '3']))
+        ctx_proxy.client.main(self.expected.get('args'))
+
+    def test_timeout(self):
+        self.expected.update(dict(timeout='10'))
+        ctx_proxy.client.main(['--timeout', self.expected.get('timeout')])
+        self.expected.update(dict(timeout='15'))
+        ctx_proxy.client.main(['-t', self.expected.get('timeout')])
+
+    def test_mixed_order(self):
+        self.expected.update(dict(
+            args=['1', '2', '3'], timeout='20', socket_url='mixed_socket_url'))
+        ctx_proxy.client.main(
+            ['-t', self.expected.get('timeout')] +
+            ['--socket-url', self.expected.get('socket_url')] +
+            self.expected.get('args'))
+        ctx_proxy.client.main(
+            ['-t', self.expected.get('timeout')] +
+            self.expected.get('args') +
+            ['--socket-url', self.expected.get('socket_url')])
+        ctx_proxy.client.main(
+            self.expected.get('args') +
+            ['-t', self.expected.get('timeout')] +
+            ['--socket-url', self.expected.get('socket_url')])
+
+    def test_json_args(self):
+        args = ['@1', '@[1,2,3]', '@{"key":"value"}']
+        expected_args = [1, [1, 2, 3], {'key': 'value'}]
+        self.expected.update(dict(args=expected_args))
+        ctx_proxy.client.main(args)
+
+    def test_json_arg_prefix(self):
+        args = ['_1', '@1']
+        expected_args = [1, '@1']
+        self.expected.update(dict(args=expected_args))
+        ctx_proxy.client.main(args + ['--json-arg-prefix', '_'])
+
+    def test_json_output(self):
+        self.assert_valid_output('string', 'string', '"string"')
+        self.assert_valid_output(1, '1', '1')
+        self.assert_valid_output([1, '2'], "[1, '2']", '[1, "2"]')
+        self.assert_valid_output({'key': 1},
+                                 "{'key': 1}",
+                                 '{"key": 1}')
+        self.assert_valid_output(False, '', 'false')
+        self.assert_valid_output(True, 'True', 'true')
+        self.assert_valid_output([], '', '[]')
+        self.assert_valid_output({}, '', '{}')
+
+    def assert_valid_output(self, response, ex_typed_output, ex_json_output):
+        self.mock_response = response
+        current_stdout = sys.stdout
+
+        def run(args, expected):
+            output = StringIO.StringIO()
+            sys.stdout = output
+            ctx_proxy.client.main(args)
+            assert output.getvalue() == expected
+
+        try:
+            run([], ex_typed_output)
+            run(['-j'], ex_json_output)
+            run(['--json-output'], ex_json_output)
+        finally:
+            sys.stdout = current_stdout
+
+    def mock_client_request(self, socket_url, args, timeout):
+        assert socket_url == self.expected.get('socket_url')
+        assert args == self.expected.get('args')
+        assert timeout == int(self.expected.get('timeout'))
+        return self.mock_response
+
+    @pytest.fixture(autouse=True)
+    def patch_client_request(self, mocker):
+        mocker.patch.object(ctx_proxy.client,
+                            ctx_proxy.client._client_request.__name__,
+                            self.mock_client_request)
+        mocker.patch.dict('os.environ', {'CTX_SOCKET_URL': 'stub'})
+
+    @pytest.fixture(autouse=True)
+    def defaults(self):
+        self.expected = dict(args=[], timeout=30, socket_url='stub')
+        self.mock_response = None
+
+
+class TestCtxEntryPoint(object):
+
+    def test_ctx_in_path(self):
+        p = subprocess.Popen(['ctx', '--help'],
+                             stdout=subprocess.PIPE,
+                             stderr=subprocess.PIPE)
+        p.communicate()
+        assert not p.wait()
+
+
+class TestPathDictAccess(object):
+    def test_simple_set(self):
+        obj = {}
+        path_dict = ctx_proxy.server._PathDictAccess(obj)
+        path_dict.set('foo', 42)
+        assert obj == {'foo': 42}
+
+    def test_nested_set(self):
+        obj = {'foo': {}}
+        path_dict = ctx_proxy.server._PathDictAccess(obj)
+        path_dict.set('foo.bar', 42)
+        assert obj == {'foo': {'bar': 42}}
+
+    def test_set_index(self):
+        obj = {'foo': [None, {'bar': 0}]}
+        path_dict = ctx_proxy.server._PathDictAccess(obj)
+        path_dict.set('foo[1].bar', 42)
+        assert obj == {'foo': [None, {'bar': 42}]}
+
+    def test_set_nonexistent_parent(self):
+        obj = {}
+        path_dict = ctx_proxy.server._PathDictAccess(obj)
+        path_dict.set('foo.bar', 42)
+        assert obj == {'foo': {'bar': 42}}
+
+    def test_set_nonexistent_parent_nested(self):
+        obj = {}
+        path_dict = ctx_proxy.server._PathDictAccess(obj)
+        path_dict.set('foo.bar.baz', 42)
+        assert obj == {'foo': {'bar': {'baz': 42}}}
+
+    def test_simple_get(self):
+        obj = {'foo': 42}
+        path_dict = ctx_proxy.server._PathDictAccess(obj)
+        result = path_dict.get('foo')
+        assert result == 42
+
+    def test_nested_get(self):
+        obj = {'foo': {'bar': 42}}
+        path_dict = ctx_proxy.server._PathDictAccess(obj)
+        result = path_dict.get('foo.bar')
+        assert result == 42
+
+    def test_nested_get_shadows_dotted_name(self):
+        obj = {'foo': {'bar': 42}, 'foo.bar': 58}
+        path_dict = ctx_proxy.server._PathDictAccess(obj)
+        result = path_dict.get('foo.bar')
+        assert result == 42
+
+    def test_index_get(self):
+        obj = {'foo': [0, 1]}
+        path_dict = ctx_proxy.server._PathDictAccess(obj)
+        result = path_dict.get('foo[1]')
+        assert result == 1
+
+    def test_get_nonexistent(self):
+        obj = {}
+        path_dict = ctx_proxy.server._PathDictAccess(obj)
+        with pytest.raises(RuntimeError):
+            path_dict.get('foo')
+
+    def test_get_by_index_not_list(self):
+        obj = {'foo': {0: 'not-list'}}
+        path_dict = ctx_proxy.server._PathDictAccess(obj)
+        with pytest.raises(RuntimeError):
+            path_dict.get('foo[0]')
+
+    def test_get_by_index_nonexistent_parent(self):
+        obj = {}
+        path_dict = ctx_proxy.server._PathDictAccess(obj)
+        with pytest.raises(RuntimeError):
+            path_dict.get('foo[1]')

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/433d55ea/tests/orchestrator/execution_plugin/test_global_ctx.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_global_ctx.py 
b/tests/orchestrator/execution_plugin/test_global_ctx.py
new file mode 100644
index 0000000..dad7547
--- /dev/null
+++ b/tests/orchestrator/execution_plugin/test_global_ctx.py
@@ -0,0 +1,28 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from aria.orchestrator import execution_plugin
+
+
+def test_python_script_scope():
+    assert execution_plugin.ctx is None
+    assert execution_plugin.inputs is None
+    ctx = object()
+    inputs = object()
+    with execution_plugin.python_script_scope(operation_ctx=ctx, 
operation_inputs=inputs):
+        assert execution_plugin.ctx is ctx
+        assert execution_plugin.inputs is inputs
+    assert execution_plugin.ctx is None
+    assert execution_plugin.inputs is None

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/433d55ea/tests/orchestrator/execution_plugin/test_local.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_local.py 
b/tests/orchestrator/execution_plugin/test_local.py
new file mode 100644
index 0000000..497da48
--- /dev/null
+++ b/tests/orchestrator/execution_plugin/test_local.py
@@ -0,0 +1,587 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import os
+
+import pytest
+
+from aria import workflow
+from aria.orchestrator import events
+from aria.orchestrator.workflows import api
+from aria.orchestrator.workflows.exceptions import ExecutorException
+from aria.orchestrator.exceptions import TaskAbortException, TaskRetryException
+from aria.orchestrator.execution_plugin import operations
+from aria.orchestrator.execution_plugin.exceptions import ProcessException
+from aria.orchestrator.execution_plugin import local
+from aria.orchestrator.execution_plugin import constants
+from aria.orchestrator.workflows.executor import process
+from aria.orchestrator.workflows.core import engine
+
+from tests import mock, storage
+from tests.orchestrator.workflows.helpers import events_collector
+
+IS_WINDOWS = os.name == 'nt'
+
+
+class TestLocalRunScript(object):
+
+    def test_script_path_parameter(self, executor, workflow_context, tmpdir):
+        script_path = self._create_script(
+            tmpdir,
+            linux_script='''#! /bin/bash -e
+            ctx node-instance runtime-properties map.key value
+            ''',
+            windows_script='''
+            ctx node-instance runtime-properties map.key value
+        ''')
+        props = self._run(
+            executor, workflow_context,
+            script_path=script_path)
+        assert props['map']['key'] == 'value'
+
+    def test_process_env(self, executor, workflow_context, tmpdir):
+        script_path = self._create_script(
+            tmpdir,
+            linux_script='''#! /bin/bash -e
+            ctx node-instance runtime-properties map.key1 $key1
+            ctx node-instance runtime-properties map.key2 $key2
+            ''',
+            windows_script='''
+            ctx node-instance runtime-properties map.key1 %key1%
+            ctx node-instance runtime-properties map.key2 %key2%
+        ''')
+        props = self._run(
+            executor, workflow_context,
+            script_path=script_path,
+            process={
+                'env': {
+                    'key1': 'value1',
+                    'key2': 'value2'
+                }
+            })
+        p_map = props['map']
+        assert p_map['key1'] == 'value1'
+        assert p_map['key2'] == 'value2'
+
+    def test_process_cwd(self, executor, workflow_context, tmpdir):
+        script_path = self._create_script(
+            tmpdir,
+            linux_script='''#! /bin/bash -e
+            ctx node-instance runtime-properties map.cwd $PWD
+            ''',
+            windows_script='''
+            ctx node-instance runtime-properties map.cwd %CD%
+            ''')
+        tmpdir = str(tmpdir)
+        props = self._run(
+            executor, workflow_context,
+            script_path=script_path,
+            process={
+                'cwd': tmpdir
+            })
+        p_map = props['map']
+        assert p_map['cwd'] == tmpdir
+
+    def test_process_command_prefix(self, executor, workflow_context, tmpdir):
+        use_ctx = 'ctx node-instance runtime-properties map.key value'
+        python_script = ['import subprocess',
+                         'subprocess.Popen("{0}".split(' 
')).communicate()[0]'.format(use_ctx)]
+        python_script = '\n'.join(python_script)
+        script_path = self._create_script(
+            tmpdir,
+            linux_script=python_script,
+            windows_script=python_script,
+            windows_suffix='',
+            linux_suffix='')
+        props = self._run(
+            executor, workflow_context,
+            script_path=script_path,
+            process={
+                'env': {'TEST_KEY': 'value'},
+                'command_prefix': 'python'
+            })
+        p_map = props['map']
+        assert p_map['key'] == 'value'
+
+    def test_process_args(self, executor, workflow_context, tmpdir):
+        script_path = self._create_script(
+            tmpdir,
+            linux_script='''#! /bin/bash -e
+            ctx node-instance runtime-properties map.arg1 "$1"
+            ctx node-instance runtime-properties map.arg2 $2
+            ''',
+            windows_script='''
+            ctx node-instance runtime-properties map.arg1 %1
+            ctx node-instance runtime-properties map.arg2 %2
+            ''')
+        props = self._run(
+            executor, workflow_context,
+            script_path=script_path,
+            process={
+                'args': ['"arg with spaces"', 'arg2']
+            })
+        assert props['map']['arg1'] == 'arg with spaces'
+        assert props['map']['arg2'] == 'arg2'
+
+    def test_no_script_path(self, executor, workflow_context):
+        exception = self._run_and_get_task_exception(
+            executor, workflow_context,
+            script_path=None)
+        assert isinstance(exception, TaskAbortException)
+        assert 'script_path' in exception.message
+
+    def test_script_error(self, executor, workflow_context, tmpdir):
+        script_path = self._create_script(
+            tmpdir,
+            linux_script='''#! /bin/bash -e
+            echo 123123
+            command_that_does_not_exist
+            ''',
+            windows_script='''
+            @echo off
+            echo 123123
+            command_that_does_not_exist
+            ''')
+        exception = self._run_and_get_task_exception(
+            executor, workflow_context,
+            script_path=script_path)
+        assert isinstance(exception, ProcessException)
+        assert os.path.basename(script_path) in exception.command
+        assert exception.exit_code == 1 if IS_WINDOWS else 127
+        assert exception.stdout.strip() == '123123'
+        assert 'command_that_does_not_exist' in exception.stderr
+
+    def test_script_error_from_bad_ctx_request(self, executor, 
workflow_context, tmpdir):
+        script_path = self._create_script(
+            tmpdir,
+            linux_script='''#! /bin/bash -e
+            ctx property_that_does_not_exist
+            ''',
+            windows_script='''
+            ctx property_that_does_not_exist
+            ''')
+        exception = self._run_and_get_task_exception(
+            executor, workflow_context,
+            script_path=script_path)
+        assert isinstance(exception, ProcessException)
+        assert os.path.basename(script_path) in exception.command
+        assert exception.exit_code == 1
+        assert 'RequestError' in exception.stderr
+        assert 'property_that_does_not_exist' in exception.stderr
+
+    def test_python_script(self, executor, workflow_context, tmpdir):
+        script = '''
+from aria.orchestrator.execution_plugin import ctx, inputs
+if __name__ == '__main__':
+    ctx.node_instance.runtime_properties['key'] = inputs['key']
+'''
+        suffix = '.py'
+        script_path = self._create_script(
+            tmpdir,
+            linux_script=script,
+            windows_script=script,
+            linux_suffix=suffix,
+            windows_suffix=suffix)
+        props = self._run(
+            executor, workflow_context,
+            script_path=script_path,
+            inputs={'key': 'value'})
+        assert props['key'] == 'value'
+
+    @pytest.mark.parametrize(
+        'value', ['string-value', [1, 2, 3], 999, 3.14, False,
+                  {'complex1': {'complex2': {'key': 'value'}, 'list': [1, 2, 
3]}}])
+    def test_inputs_as_environment_variables(self, executor, workflow_context, 
tmpdir, value):
+        script_path = self._create_script(
+            tmpdir,
+            linux_script='''#! /bin/bash -e
+            ctx node-instance runtime-properties key "${input_as_env_var}"
+            ''',
+            windows_script='''
+            ctx node-instance runtime-properties key "%input_as_env_var%"
+        ''')
+        props = self._run(
+            executor, workflow_context,
+            script_path=script_path,
+            env_var=value)
+        expected = props['key'] if isinstance(value, basestring) else 
json.loads(props['key'])
+        assert expected == value
+
+    @pytest.mark.parametrize('value', ['override', {'key': 'value'}])
+    def test_explicit_env_variables_inputs_override(
+            self, executor, workflow_context, tmpdir, value):
+        script_path = self._create_script(
+            tmpdir,
+            linux_script='''#! /bin/bash -e
+            ctx node-instance runtime-properties key "${input_as_env_var}"
+            ''',
+            windows_script='''
+            ctx node-instance runtime-properties key "%input_as_env_var%"
+        ''')
+
+        props = self._run(
+            executor, workflow_context,
+            script_path=script_path,
+            env_var='test-value',
+            process={
+                'env': {
+                    'input_as_env_var': value
+                }
+            })
+        expected = props['key'] if isinstance(value, basestring) else 
json.loads(props['key'])
+        assert expected == value
+
+    def test_get_nonexistent_runtime_property(self, executor, 
workflow_context, tmpdir):
+        script_path = self._create_script(
+            tmpdir,
+            linux_script='''#! /bin/bash -e
+            ctx node-instance runtime-properties nonexistent
+            ''',
+            windows_script='''
+            ctx node-instance runtime-properties nonexistent
+        ''')
+        exception = self._run_and_get_task_exception(
+            executor, workflow_context,
+            script_path=script_path)
+        assert isinstance(exception, ProcessException)
+        assert os.path.basename(script_path) in exception.command
+        assert 'RequestError' in exception.stderr
+        assert 'nonexistent' in exception.stderr
+
+    def test_get_nonexistent_runtime_property_json(self, executor, 
workflow_context, tmpdir):
+        script_path = self._create_script(
+            tmpdir,
+            linux_script='''#! /bin/bash -e
+            ctx -j instance runtime-properties nonexistent
+            ''',
+            windows_script='''
+            ctx -j instance runtime-properties nonexistent
+            ''')
+        exception = self._run_and_get_task_exception(
+            executor, workflow_context,
+            script_path=script_path)
+        assert isinstance(exception, ProcessException)
+        assert os.path.basename(script_path) in exception.command
+        assert 'RequestError' in exception.stderr
+        assert 'nonexistent' in exception.stderr
+
+    def test_abort(self, executor, workflow_context, tmpdir):
+        script_path = self._create_script(
+            tmpdir,
+            linux_script='''#! /bin/bash -e
+            ctx task abort abort-message
+            ''',
+            windows_script='''
+            ctx task abort abort-message
+            ''')
+        exception = self._run_and_get_task_exception(
+            executor, workflow_context,
+            script_path=script_path)
+        assert isinstance(exception, TaskAbortException)
+        assert exception.message == 'abort-message'
+
+    def test_retry(self, executor, workflow_context, tmpdir):
+        script_path = self._create_script(
+            tmpdir,
+            linux_script='''#! /bin/bash -e
+            ctx task retry retry-message
+            ''',
+            windows_script='''
+            ctx task retry retry-message
+            ''')
+        exception = self._run_and_get_task_exception(
+            executor, workflow_context,
+            script_path=script_path)
+        assert isinstance(exception, TaskRetryException)
+        assert exception.message == 'retry-message'
+
+    def test_retry_with_interval(self, executor, workflow_context, tmpdir):
+        script_path = self._create_script(
+            tmpdir,
+            linux_script='''#! /bin/bash -e
+            ctx task retry retry-message @100
+            ''',
+            windows_script='''
+            ctx task retry retry-message @100
+            ''')
+        exception = self._run_and_get_task_exception(
+            executor, workflow_context,
+            script_path=script_path)
+        assert isinstance(exception, TaskRetryException)
+        assert exception.message == 'retry-message'
+        assert exception.retry_interval == 100
+
+    def test_crash_abort_after_retry(self, executor, workflow_context, tmpdir):
+        script_path = self._create_script(
+            tmpdir,
+            linux_script='''#! /bin/bash
+            ctx task retry retry-message
+            ctx task abort should-raise-a-runtime-error
+            ''',
+            windows_script='''
+            ctx task retry retry-message
+            ctx task abort should-raise-a-runtime-error
+        ''')
+        exception = self._run_and_get_task_exception(
+            executor, workflow_context,
+            script_path=script_path)
+        assert isinstance(exception, TaskAbortException)
+        assert exception.message == constants.ILLEGAL_CTX_OPERATION_MESSAGE
+
+    def test_crash_retry_after_abort(self, executor, workflow_context, tmpdir):
+        script_path = self._create_script(
+            tmpdir,
+            linux_script='''#! /bin/bash
+            ctx task abort abort-message
+            ctx task retry should-raise-a-runtime-error
+            ''',
+            windows_script='''
+            ctx task abort abort-message
+            ctx task retry should-raise-a-runtime-error
+            ''')
+        exception = self._run_and_get_task_exception(
+            executor, workflow_context,
+            script_path=script_path)
+        assert isinstance(exception, TaskAbortException)
+        assert exception.message == constants.ILLEGAL_CTX_OPERATION_MESSAGE
+
+    def test_crash_abort_after_abort(self, executor, workflow_context, tmpdir):
+        script_path = self._create_script(
+            tmpdir,
+            linux_script='''#! /bin/bash
+            ctx task abort abort-message
+            ctx task abort should-raise-a-runtime-error
+            ''',
+            windows_script='''
+            ctx task abort abort-message
+            ctx task abort should-raise-a-runtime-error
+            ''')
+        exception = self._run_and_get_task_exception(
+            executor, workflow_context,
+            script_path=script_path)
+        assert isinstance(exception, TaskAbortException)
+        assert exception.message == constants.ILLEGAL_CTX_OPERATION_MESSAGE
+
+    def test_crash_retry_after_retry(self, executor, workflow_context, tmpdir):
+        script_path = self._create_script(
+            tmpdir,
+            linux_script='''#! /bin/bash
+            ctx task retry retry-message
+            ctx task retry should-raise-a-runtime-error
+            ''',
+            windows_script='''
+            ctx task retry retry-message
+            ctx task retry should-raise-a-runtime-error
+            ''')
+        exception = self._run_and_get_task_exception(
+            executor, workflow_context,
+            script_path=script_path)
+        assert isinstance(exception, TaskAbortException)
+        assert exception.message == constants.ILLEGAL_CTX_OPERATION_MESSAGE
+
+    def test_retry_returns_a_nonzero_exit_code(self, executor, 
workflow_context, tmpdir):
+        log_path = tmpdir.join('temp.log')
+        message = 'message'
+        script_path = self._create_script(
+            tmpdir,
+            linux_script='''#! /bin/bash -e
+            ctx task retry '{0}' 2> {1}
+            echo should-not-run > {1}
+            '''.format(message, log_path),
+            windows_script='''
+            ctx task retry "{0}" 2> {1}
+            if %errorlevel% neq 0 exit /b %errorlevel%
+            echo should-not-run > {1}
+            '''.format(message, log_path))
+        with pytest.raises(ExecutorException):
+            self._run(
+                executor, workflow_context,
+                script_path=script_path)
+        assert log_path.read().strip() == message
+
+    def test_abort_returns_a_nonzero_exit_code(self, executor, 
workflow_context, tmpdir):
+        log_path = tmpdir.join('temp.log')
+        message = 'message'
+        script_path = self._create_script(
+            tmpdir,
+            linux_script='''#! /bin/bash -e
+            ctx task abort '{0}' 2> {1}
+            echo should-not-run > {1}
+            '''.format(message, log_path),
+            windows_script='''
+            ctx task abort "{0}" 2> {1}
+            if %errorlevel% neq 0 exit /b %errorlevel%
+            echo should-not-run > {1}
+            '''.format(message, log_path))
+        with pytest.raises(ExecutorException):
+            self._run(
+                executor, workflow_context,
+                script_path=script_path)
+        assert log_path.read().strip() == message
+
+    def _create_script(self,
+                       tmpdir,
+                       linux_script,
+                       windows_script,
+                       windows_suffix='.bat',
+                       linux_suffix=''):
+        suffix = windows_suffix if IS_WINDOWS else linux_suffix
+        script = windows_script if IS_WINDOWS else linux_script
+        script_path = tmpdir.join('script{0}'.format(suffix))
+        script_path.write(script)
+        return str(script_path)
+
+    def _run_and_get_task_exception(self, *args, **kwargs):
+        signal = events.on_failure_task_signal
+        with events_collector(signal) as collected:
+            with pytest.raises(ExecutorException):
+                self._run(*args, **kwargs)
+        return collected[signal][0]['kwargs']['exception']
+
+    def _run(self,
+             executor,
+             workflow_context,
+             script_path,
+             process=None,
+             env_var='value',
+             inputs=None):
+        local_script_path = script_path
+        script_path = os.path.basename(local_script_path) if local_script_path 
else None
+        if script_path:
+            workflow_context.resource.deployment.upload(
+                entry_id=str(workflow_context.deployment.id),
+                source=local_script_path,
+                path=script_path)
+
+        inputs = inputs or {}
+        inputs.update({
+            'script_path': script_path,
+            'process': process,
+            'input_as_env_var': env_var
+        })
+
+        @workflow
+        def mock_workflow(ctx, graph):
+            op = 'test.op'
+            node_instance = ctx.model.node_instance.get_by_name(
+                mock.models.DEPENDENCY_NODE_INSTANCE_NAME)
+            node_instance.node.operations[op] = {
+                'operation': '{0}.{1}'.format(operations.__name__,
+                                              
operations.run_script_locally.__name__)}
+            graph.add_tasks(api.task.OperationTask.node_instance(
+                instance=node_instance,
+                name=op,
+                inputs=inputs))
+            return graph
+        tasks_graph = mock_workflow(ctx=workflow_context)  # pylint: 
disable=no-value-for-parameter
+        eng = engine.Engine(
+            executor=executor,
+            workflow_context=workflow_context,
+            tasks_graph=tasks_graph)
+        eng.execute()
+        return workflow_context.model.node_instance.get_by_name(
+            mock.models.DEPENDENCY_NODE_INSTANCE_NAME).runtime_properties
+
+    @pytest.fixture
+    def executor(self):
+        result = process.ProcessExecutor()
+        yield result
+        result.close()
+
+    @pytest.fixture
+    def workflow_context(self, tmpdir):
+        workflow_context = mock.context.simple(
+            storage.get_sqlite_api_kwargs(str(tmpdir)),
+            resources_dir=str(tmpdir.join('resources')))
+        workflow_context.states = []
+        workflow_context.exception = None
+        yield workflow_context
+        storage.release_sqlite_storage(workflow_context.model)
+
+
+class BaseTestConfiguration(object):
+
+    @pytest.fixture(autouse=True)
+    def mock_execute(self, mocker):
+        def eval_func(**_):
+            self.called = 'eval'
+
+        def execute_func(process, **_):
+            self.process = process
+            self.called = 'execute'
+        self.process = {}
+        self.called = None
+        mocker.patch.object(local, '_execute_func', execute_func)
+        mocker.patch.object(local, '_eval_script_func', eval_func)
+
+    class Ctx(object):
+        @staticmethod
+        def download_resource(destination, *args, **kwargs):
+            return destination
+
+    def _run(self, script_path, process=None):
+        local.run_script(
+            script_path=script_path,
+            process=process,
+            ctx=self.Ctx)
+
+
+class TestPowerShellConfiguration(BaseTestConfiguration):
+
+    def test_implicit_powershell_call_with_ps1_extension(self):
+        self._run(script_path='script_path.ps1')
+        assert self.process['command_prefix'] == 'powershell'
+
+    def test_command_prefix_is_overridden_for_ps1_extension(self):
+        self._run(script_path='script_path.ps1',
+                  process={'command_prefix': 'bash'})
+        assert self.process['command_prefix'] == 'bash'
+
+    def test_explicit_powershell_call(self):
+        self._run(script_path='script_path.ps1',
+                  process={'command_prefix': 'powershell'})
+        assert self.process['command_prefix'] == 'powershell'
+
+
+class TestEvalPythonConfiguration(BaseTestConfiguration):
+
+    def test_explicit_eval_without_py_extension(self):
+        self._run(script_path='script_path',
+                  process={'eval_python': True})
+        assert self.called == 'eval'
+
+    def test_explicit_eval_with_py_extension(self):
+        self._run(script_path='script_path.py',
+                  process={'eval_python': True})
+        assert self.called == 'eval'
+
+    def test_implicit_eval(self):
+        self._run(script_path='script_path.py')
+        assert self.called == 'eval'
+
+    def test_explicit_execute_without_py_extension(self):
+        self._run(script_path='script_path',
+                  process={'eval_python': False})
+        assert self.called == 'execute'
+
+    def test_explicit_execute_with_py_extension(self):
+        self._run(script_path='script_path.py',
+                  process={'eval_python': False})
+        assert self.called == 'execute'
+
+    def test_implicit_execute(self):
+        self._run(script_path='script_path')
+        assert self.called == 'execute'

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/433d55ea/tests/orchestrator/execution_plugin/test_ssh.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_ssh.py 
b/tests/orchestrator/execution_plugin/test_ssh.py
new file mode 100644
index 0000000..6b5c783
--- /dev/null
+++ b/tests/orchestrator/execution_plugin/test_ssh.py
@@ -0,0 +1,481 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import contextlib
+import json
+import logging
+import os
+
+import pytest
+
+import fabric.api
+from fabric.contrib import files
+from fabric import context_managers
+
+from aria.storage import model
+from aria.orchestrator import events
+from aria.orchestrator import workflow
+from aria.orchestrator.workflows import api
+from aria.orchestrator.workflows.executor import process
+from aria.orchestrator.workflows.core import engine
+from aria.orchestrator.workflows.exceptions import ExecutorException
+from aria.orchestrator.exceptions import TaskAbortException, TaskRetryException
+from aria.orchestrator.execution_plugin import operations
+from aria.orchestrator.execution_plugin import constants
+from aria.orchestrator.execution_plugin.exceptions import ProcessException, 
TaskException
+from aria.orchestrator.execution_plugin.ssh import operations as ssh_operations
+
+from tests import mock, storage, resources
+from tests.orchestrator.workflows.helpers import events_collector
+
+_CUSTOM_BASE_DIR = '/tmp/new-aria-ctx'
+
+_FABRIC_ENV = {
+    'host_string': 'localhost',
+    'user': 'travis',
+    'password': 'travis'
+}
+
+
+@pytest.mark.skipif(not os.environ.get('TRAVIS'), reason='actual ssh server 
required')
+class TestWithActualSSHServer(object):
+
+    def test_run_script_basic(self):
+        expected_runtime_property_value = 'some_value'
+        props = self._execute(env={'test_value': 
expected_runtime_property_value})
+        assert props['test_value'] == expected_runtime_property_value
+
+    @pytest.mark.skip(reason='sudo privileges are required')
+    def test_run_script_as_sudo(self):
+        self._execute(use_sudo=True)
+        with self._ssh_env():
+            assert files.exists('/opt/test_dir')
+            fabric.api.sudo('rm -rf /opt/test_dir')
+
+    def test_run_script_default_base_dir(self):
+        props = self._execute()
+        assert props['work_dir'] == 
'{0}/work'.format(constants.DEFAULT_BASE_DIR)
+
+    @pytest.mark.skip(reason='Re-enable once output from process executor can 
be captured')
+    @pytest.mark.parametrize('hide_groups', [[], ['everything']])
+    def test_run_script_with_hide(self, hide_groups):
+        self._execute(hide_output=hide_groups)
+        output = 'TODO'
+        expected_log_message = ('[localhost] run: source {0}/scripts/'
+                                .format(constants.DEFAULT_BASE_DIR))
+        if hide_groups:
+            assert expected_log_message not in output
+        else:
+            assert expected_log_message in output
+
+    def test_run_script_process_config(self):
+        expected_env_value = 'test_value_env'
+        expected_arg1_value = 'test_value_arg1'
+        expected_arg2_value = 'test_value_arg2'
+        expected_cwd = '/tmp'
+        expected_base_dir = _CUSTOM_BASE_DIR
+        props = self._execute(
+            env={'test_value_env': expected_env_value},
+            process={
+                'args': [expected_arg1_value, expected_arg2_value],
+                'cwd': expected_cwd,
+                'base_dir': expected_base_dir
+            })
+        assert props['env_value'] == expected_env_value
+        assert len(props['bash_version']) > 0
+        assert props['arg1_value'] == expected_arg1_value
+        assert props['arg2_value'] == expected_arg2_value
+        assert props['cwd'] == expected_cwd
+        assert props['ctx_path'] == '{0}/ctx'.format(expected_base_dir)
+
+    def test_run_script_command_prefix(self):
+        props = self._execute(process={'command_prefix': 'bash -i'})
+        assert 'i' in props['dollar_dash']
+
+    def test_run_script_reuse_existing_ctx(self):
+        expected_test_value_1 = 'test_value_1'
+        expected_test_value_2 = 'test_value_2'
+        props = self._execute(
+            test_operations=['{0}_1'.format(self.test_name),
+                             '{0}_2'.format(self.test_name)],
+            env={'test_value1': expected_test_value_1,
+                 'test_value2': expected_test_value_2})
+        assert props['test_value1'] == expected_test_value_1
+        assert props['test_value2'] == expected_test_value_2
+
+    def test_run_script_download_resource_plain(self, tmpdir):
+        resource = tmpdir.join('resource')
+        resource.write('content')
+        self._upload(str(resource), 'test_resource')
+        props = self._execute()
+        assert props['test_value'] == 'content'
+
+    def test_run_script_download_resource_and_render(self, tmpdir):
+        resource = tmpdir.join('resource')
+        resource.write('{{ctx.deployment.name}}')
+        self._upload(str(resource), 'test_resource')
+        props = self._execute()
+        assert props['test_value'] == self._workflow_context.deployment.name
+
+    @pytest.mark.parametrize('value', ['string-value', [1, 2, 3], {'key': 
'value'}])
+    def test_run_script_inputs_as_env_variables_no_override(self, value):
+        props = self._execute(custom_input=value)
+        return_value = props['test_value']
+        expected = return_value if isinstance(value, basestring) else 
json.loads(return_value)
+        assert value == expected
+
+    @pytest.mark.parametrize('value', ['string-value', [1, 2, 3], {'key': 
'value'}])
+    def test_run_script_inputs_as_env_variables_process_env_override(self, 
value):
+        props = self._execute(custom_input='custom-input-value',
+                              env={'custom_env_var': value})
+        return_value = props['test_value']
+        expected = return_value if isinstance(value, basestring) else 
json.loads(return_value)
+        assert value == expected
+
+    def test_run_script_error_in_script(self):
+        exception = self._execute_and_get_task_exception()
+        assert isinstance(exception, TaskException)
+
+    def test_run_script_abort_immediate(self):
+        exception = self._execute_and_get_task_exception()
+        assert isinstance(exception, TaskAbortException)
+        assert exception.message == 'abort-message'
+
+    def test_run_script_retry(self):
+        exception = self._execute_and_get_task_exception()
+        assert isinstance(exception, TaskRetryException)
+        assert exception.message == 'retry-message'
+
+    def test_run_script_abort_error_ignored_by_script(self):
+        exception = self._execute_and_get_task_exception()
+        assert isinstance(exception, TaskAbortException)
+        assert exception.message == 'abort-message'
+
+    def test_run_commands(self):
+        temp_file_path = '/tmp/very_temporary_file'
+        with self._ssh_env():
+            if files.exists(temp_file_path):
+                fabric.api.run('rm {0}'.format(temp_file_path))
+        self._execute(commands=['touch {0}'.format(temp_file_path)])
+        with self._ssh_env():
+            assert files.exists(temp_file_path)
+            fabric.api.run('rm {0}'.format(temp_file_path))
+
+    @pytest.fixture(autouse=True)
+    def _setup(self, request, workflow_context, executor, capfd):
+        self._workflow_context = workflow_context
+        self._executor = executor
+        self._capfd = capfd
+        self.test_name = request.node.originalname or request.node.name
+        with self._ssh_env():
+            for directory in [constants.DEFAULT_BASE_DIR, _CUSTOM_BASE_DIR]:
+                if files.exists(directory):
+                    fabric.api.run('rm -rf {0}'.format(directory))
+
+    @contextlib.contextmanager
+    def _ssh_env(self):
+        with self._capfd.disabled():
+            with context_managers.settings(fabric.api.hide('everything'),
+                                           **_FABRIC_ENV):
+                yield
+
+    def _execute(self,
+                 env=None,
+                 use_sudo=False,
+                 hide_output=None,
+                 process=None,
+                 custom_input='',
+                 test_operations=None,
+                 commands=None):
+        process = process or {}
+        if env:
+            process.setdefault('env', {}).update(env)
+
+        test_operations = test_operations or [self.test_name]
+
+        local_script_path = os.path.join(resources.DIR, 'scripts', 
'test_ssh.sh')
+        script_path = os.path.basename(local_script_path)
+        self._upload(local_script_path, script_path)
+
+        if commands:
+            operation = operations.run_commands_with_ssh
+        else:
+            operation = operations.run_script_with_ssh
+
+        @workflow
+        def mock_workflow(ctx, graph):
+            op = 'test.op'
+            node_instance = ctx.model.node_instance.get_by_name(
+                mock.models.DEPENDENCY_NODE_INSTANCE_NAME)
+            node_instance.node.operations[op] = {
+                'operation': '{0}.{1}'.format(operations.__name__, 
operation.__name__)}
+            graph.sequence(*[api.task.OperationTask.node_instance(
+                instance=node_instance,
+                name=op,
+                inputs={
+                    'script_path': script_path,
+                    'fabric_env': _FABRIC_ENV,
+                    'process': process,
+                    'use_sudo': use_sudo,
+                    'hide_output': hide_output,
+                    'custom_env_var': custom_input,
+                    'test_operation': test_operation,
+                    'commands': commands
+                }) for test_operation in test_operations])
+            return graph
+        tasks_graph = mock_workflow(ctx=self._workflow_context)  # pylint: 
disable=no-value-for-parameter
+        eng = engine.Engine(
+            executor=self._executor,
+            workflow_context=self._workflow_context,
+            tasks_graph=tasks_graph)
+        eng.execute()
+        return self._workflow_context.model.node_instance.get_by_name(
+            mock.models.DEPENDENCY_NODE_INSTANCE_NAME).runtime_properties
+
+    def _execute_and_get_task_exception(self, *args, **kwargs):
+        signal = events.on_failure_task_signal
+        with events_collector(signal) as collected:
+            with pytest.raises(ExecutorException):
+                self._execute(*args, **kwargs)
+        return collected[signal][0]['kwargs']['exception']
+
+    def _upload(self, source, path):
+        self._workflow_context.resource.deployment.upload(
+            entry_id=str(self._workflow_context.deployment.id),
+            source=source,
+            path=path)
+
+    @pytest.fixture
+    def executor(self):
+        result = process.ProcessExecutor()
+        yield result
+        result.close()
+
+    @pytest.fixture
+    def workflow_context(self, tmpdir):
+        workflow_context = mock.context.simple(
+            storage.get_sqlite_api_kwargs(str(tmpdir)),
+            resources_dir=str(tmpdir.join('resources')))
+        workflow_context.states = []
+        workflow_context.exception = None
+        yield workflow_context
+        storage.release_sqlite_storage(workflow_context.model)
+
+
+class TestFabricEnvHideGroupsAndRunCommands(object):
+
+    def test_fabric_env_default_override(self):
+        # first sanity for no override
+        self._run()
+        assert self.mock.settings_merged['timeout'] == 
constants.FABRIC_ENV_DEFAULTS['timeout']
+        # now override
+        invocation_fabric_env = self.default_fabric_env.copy()
+        timeout = 1000000
+        invocation_fabric_env['timeout'] = timeout
+        self._run(fabric_env=invocation_fabric_env)
+        assert self.mock.settings_merged['timeout'] == timeout
+
+    def test_implicit_host_string(self, mocker):
+        expected_ip = '1.1.1.1'
+        mocker.patch.object(self._Ctx.task.runs_on, 'ip', expected_ip)
+        fabric_env = self.default_fabric_env.copy()
+        del fabric_env['host_string']
+        self._run(fabric_env=fabric_env)
+        assert self.mock.settings_merged['host_string'] == expected_ip
+
+    def test_explicit_host_string(self):
+        fabric_env = self.default_fabric_env.copy()
+        host_string = 'explicit_host_string'
+        fabric_env['host_string'] = host_string
+        self._run(fabric_env=fabric_env)
+        assert self.mock.settings_merged['host_string'] == host_string
+
+    def test_override_warn_only(self):
+        fabric_env = self.default_fabric_env.copy()
+        self._run(fabric_env=fabric_env)
+        assert self.mock.settings_merged['warn_only'] is True
+        fabric_env = self.default_fabric_env.copy()
+        fabric_env['warn_only'] = False
+        self._run(fabric_env=fabric_env)
+        assert self.mock.settings_merged['warn_only'] is False
+
+    def test_missing_host_string(self):
+        with pytest.raises(TaskAbortException) as exc_ctx:
+            fabric_env = self.default_fabric_env.copy()
+            del fabric_env['host_string']
+            self._run(fabric_env=fabric_env)
+        assert '`host_string` not supplied' in str(exc_ctx.value)
+
+    def test_missing_user(self):
+        with pytest.raises(TaskAbortException) as exc_ctx:
+            fabric_env = self.default_fabric_env.copy()
+            del fabric_env['user']
+            self._run(fabric_env=fabric_env)
+        assert '`user` not supplied' in str(exc_ctx.value)
+
+    def test_missing_key_or_password(self):
+        with pytest.raises(TaskAbortException) as exc_ctx:
+            fabric_env = self.default_fabric_env.copy()
+            del fabric_env['key_filename']
+            self._run(fabric_env=fabric_env)
+        assert 'Access credentials not supplied' in str(exc_ctx.value)
+
+    def test_hide_in_settings_and_non_viable_groups(self):
+        groups = ('running', 'stdout')
+        self._run(hide_output=groups)
+        assert set(self.mock.settings_merged['hide_output']) == set(groups)
+        with pytest.raises(TaskAbortException) as exc_ctx:
+            self._run(hide_output=('running', 'bla'))
+        assert '`hide_output` must be a subset of' in str(exc_ctx.value)
+
+    def test_run_commands(self):
+        def test(use_sudo):
+            commands = ['command1', 'command2']
+            self._run(
+                commands=commands,
+                use_sudo=use_sudo)
+            assert all(item in self.mock.settings_merged.items() for
+                       item in self.default_fabric_env.items())
+            assert self.mock.settings_merged['warn_only'] is True
+            assert self.mock.settings_merged['use_sudo'] == use_sudo
+            assert self.mock.commands == commands
+            self.mock.settings_merged = {}
+            self.mock.commands = []
+        test(use_sudo=False)
+        test(use_sudo=True)
+
+    def test_failed_command(self):
+        with pytest.raises(ProcessException) as exc_ctx:
+            self._run(commands=['fail'])
+        exception = exc_ctx.value
+        assert exception.stdout == self.MockCommandResult.stdout
+        assert exception.stderr == self.MockCommandResult.stderr
+        assert exception.command == self.MockCommandResult.command
+        assert exception.exit_code == self.MockCommandResult.return_code
+
+    class MockCommandResult(object):
+        stdout = 'mock_stdout'
+        stderr = 'mock_stderr'
+        command = 'mock_command'
+        return_code = 1
+
+        def __init__(self, failed):
+            self.failed = failed
+
+    class MockFabricApi(object):
+
+        def __init__(self):
+            self.commands = []
+            self.settings_merged = {}
+
+        @contextlib.contextmanager
+        def settings(self, *args, **kwargs):
+            self.settings_merged.update(kwargs)
+            if args:
+                groups = args[0]
+                self.settings_merged.update({'hide_output': groups})
+            yield
+
+        def run(self, command):
+            self.commands.append(command)
+            self.settings_merged['use_sudo'] = False
+            return 
TestFabricEnvHideGroupsAndRunCommands.MockCommandResult(command == 'fail')
+
+        def sudo(self, command):
+            self.commands.append(command)
+            self.settings_merged['use_sudo'] = True
+            return 
TestFabricEnvHideGroupsAndRunCommands.MockCommandResult(command == 'fail')
+
+        def hide(self, *groups):
+            return groups
+
+        def exists(self, *args, **kwargs):
+            raise RuntimeError
+
+    class _Ctx(object):
+        class Stub(object):
+            @staticmethod
+            def abort(message=None):
+                model.Task.abort(message)
+            ip = None
+        task = Stub
+        task.runs_on = Stub
+        logger = logging.getLogger()
+
+    @pytest.fixture(autouse=True)
+    def _setup(self, mocker):
+        self.default_fabric_env = {
+            'host_string': 'test',
+            'user': 'test',
+            'key_filename': 'test',
+        }
+        self.mock = self.MockFabricApi()
+        mocker.patch('fabric.api', self.mock)
+
+    def _run(self,
+             commands=(),
+             fabric_env=None,
+             process=None,
+             use_sudo=False,
+             hide_output=None):
+        operations.run_commands_with_ssh(
+            ctx=self._Ctx,
+            commands=commands,
+            process=process,
+            fabric_env=fabric_env or self.default_fabric_env,
+            use_sudo=use_sudo,
+            hide_output=hide_output)
+
+
+class TestUtilityFunctions(object):
+
+    def test_paths(self):
+        base_dir = '/path'
+        local_script_path = '/local/script/path.py'
+        paths = ssh_operations._Paths(base_dir=base_dir,
+                                      local_script_path=local_script_path)
+        assert paths.local_script_path == local_script_path
+        assert paths.remote_ctx_dir == base_dir
+        assert paths.base_script_path == 'path.py'
+        assert paths.remote_ctx_path == '/path/ctx'
+        assert paths.remote_scripts_dir == '/path/scripts'
+        assert paths.remote_work_dir == '/path/work'
+        assert 
paths.remote_env_script_path.startswith('/path/scripts/env-path.py-')
+        assert paths.remote_script_path.startswith('/path/scripts/path.py-')
+
+    def test_write_environment_script_file(self):
+        base_dir = '/path'
+        local_script_path = '/local/script/path.py'
+        paths = ssh_operations._Paths(base_dir=base_dir,
+                                      local_script_path=local_script_path)
+        env = {'one': "'1'"}
+        local_socket_url = 'local_socket_url'
+        remote_socket_url = 'remote_socket_url'
+        env_script_lines = set([l for l in 
ssh_operations._write_environment_script_file(
+            process={'env': env},
+            paths=paths,
+            local_socket_url=local_socket_url,
+            remote_socket_url=remote_socket_url
+        ).getvalue().split('\n') if l])
+        expected_env_script_lines = set([
+            'export PATH=/path:$PATH',
+            'export PYTHONPATH=/path:$PYTHONPATH',
+            'chmod +x /path/ctx',
+            'chmod +x {0}'.format(paths.remote_script_path),
+            'export CTX_SOCKET_URL={0}'.format(remote_socket_url),
+            'export LOCAL_CTX_SOCKET_URL={0}'.format(local_socket_url),
+            'export one=\'1\''
+        ])
+        assert env_script_lines == expected_env_script_lines

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/433d55ea/tests/orchestrator/workflows/api/test_task.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/api/test_task.py 
b/tests/orchestrator/workflows/api/test_task.py
index 58e387f..601c437 100644
--- a/tests/orchestrator/workflows/api/test_task.py
+++ b/tests/orchestrator/workflows/api/test_task.py
@@ -16,6 +16,7 @@
 
 import pytest
 
+from aria.storage import model
 from aria.orchestrator import context
 from aria.orchestrator.workflows import api
 
@@ -72,6 +73,7 @@ class TestOperationTask(object):
         assert api_task.plugin == {'name': 'plugin',
                                    'package_name': 'package',
                                    'package_version': '0.1'}
+        assert api_task.runs_on == model.Task.RUNS_ON_NODE_INSTANCE
 
     def test_source_relationship_operation_task_creation(self, ctx):
         operation_name = 'aria.interfaces.relationship_lifecycle.preconfigure'
@@ -104,6 +106,7 @@ class TestOperationTask(object):
         assert api_task.plugin == {'name': 'plugin',
                                    'package_name': 'package',
                                    'package_version': '0.1'}
+        assert api_task.runs_on == model.Task.RUNS_ON_SOURCE
 
     def test_target_relationship_operation_task_creation(self, ctx):
         operation_name = 'aria.interfaces.relationship_lifecycle.preconfigure'
@@ -136,6 +139,7 @@ class TestOperationTask(object):
         assert api_task.plugin == {'name': 'plugin',
                                    'package_name': 'package',
                                    'package_version': '0.1'}
+        assert api_task.runs_on == model.Task.RUNS_ON_TARGET
 
     def test_operation_task_default_values(self, ctx):
         dependency_node_instance = ctx.model.node_instance.get_by_name(
@@ -151,6 +155,7 @@ class TestOperationTask(object):
         assert task.max_attempts == ctx._task_max_attempts
         assert task.ignore_failure == ctx._task_ignore_failure
         assert task.plugin == {}
+        assert task.runs_on is None
 
 
 class TestWorkflowTask(object):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/433d55ea/tests/orchestrator/workflows/core/test_task.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_task.py 
b/tests/orchestrator/workflows/core/test_task.py
index 5381f5d..020de32 100644
--- a/tests/orchestrator/workflows/core/test_task.py
+++ b/tests/orchestrator/workflows/core/test_task.py
@@ -38,7 +38,7 @@ def ctx(tmpdir):
 
 class TestOperationTask(object):
 
-    def _create_operation_task(self, ctx, node_instance):
+    def _create_node_operation_task(self, ctx, node_instance):
         with workflow_context.current.push(ctx):
             api_task = api.task.OperationTask.node_instance(
                 instance=node_instance,
@@ -46,21 +46,31 @@ class TestOperationTask(object):
             core_task = core.task.OperationTask(api_task=api_task)
         return api_task, core_task
 
-    def test_operation_task_creation(self, ctx):
+    def _create_relationship_operation_task(self, ctx, relationship_instance, 
operation_end):
+        with workflow_context.current.push(ctx):
+            api_task = api.task.OperationTask.relationship_instance(
+                instance=relationship_instance,
+                name='aria.interfaces.relationship_lifecycle.preconfigure',
+                operation_end=operation_end)
+            core_task = core.task.OperationTask(api_task=api_task)
+        return api_task, core_task
+
+    def test_node_operation_task_creation(self, ctx):
         storage_plugin = mock.models.get_plugin(package_name='p1', 
package_version='0.1')
         storage_plugin_other = mock.models.get_plugin(package_name='p0', 
package_version='0.0')
         ctx.model.plugin.put(storage_plugin_other)
         ctx.model.plugin.put(storage_plugin)
-        node_instance = \
-            
ctx.model.node_instance.get_by_name(mock.models.DEPENDENCY_NODE_INSTANCE_NAME)
+        node_instance = ctx.model.node_instance.get_by_name(
+            mock.models.DEPENDENCY_NODE_INSTANCE_NAME)
         node = node_instance.node
         node.plugins = [{'name': 'plugin1',
                          'package_name': 'p1',
                          'package_version': '0.1'}]
         node.operations['aria.interfaces.lifecycle.create'] = {'plugin': 
'plugin1'}
-        api_task, core_task = self._create_operation_task(ctx, node_instance)
+        api_task, core_task = self._create_node_operation_task(ctx, 
node_instance)
         storage_task = ctx.model.task.get_by_name(core_task.name)
         assert storage_task.execution_name == ctx.execution.name
+        assert storage_task.runs_on.id == core_task.context.node_instance.id
         assert core_task.model_task == storage_task
         assert core_task.name == api_task.name
         assert core_task.operation_mapping == api_task.operation_mapping
@@ -68,11 +78,25 @@ class TestOperationTask(object):
         assert core_task.inputs == api_task.inputs == storage_task.inputs
         assert core_task.plugin == storage_plugin
 
+    def test_source_relationship_operation_task_creation(self, ctx):
+        relationship_instance = ctx.model.relationship_instance.list()[0]
+        _, core_task = self._create_relationship_operation_task(
+            ctx, relationship_instance,
+            api.task.OperationTask.SOURCE_OPERATION)
+        assert core_task.model_task.runs_on.id == 
relationship_instance.source_node_instance.id
+
+    def test_target_relationship_operation_task_creation(self, ctx):
+        relationship_instance = ctx.model.relationship_instance.list()[0]
+        _, core_task = self._create_relationship_operation_task(
+            ctx, relationship_instance,
+            api.task.OperationTask.TARGET_OPERATION)
+        assert core_task.model_task.runs_on.id == 
relationship_instance.target_node_instance.id
+
     def test_operation_task_edit_locked_attribute(self, ctx):
         node_instance = \
             
ctx.model.node_instance.get_by_name(mock.models.DEPENDENCY_NODE_INSTANCE_NAME)
 
-        _, core_task = self._create_operation_task(ctx, node_instance)
+        _, core_task = self._create_node_operation_task(ctx, node_instance)
         now = datetime.utcnow()
         with pytest.raises(exceptions.TaskException):
             core_task.status = core_task.STARTED
@@ -89,7 +113,7 @@ class TestOperationTask(object):
         node_instance = \
             
ctx.model.node_instance.get_by_name(mock.models.DEPENDENCY_NODE_INSTANCE_NAME)
 
-        _, core_task = self._create_operation_task(ctx, node_instance)
+        _, core_task = self._create_node_operation_task(ctx, node_instance)
         future_time = datetime.utcnow() + timedelta(seconds=3)
 
         with core_task._update():

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/433d55ea/tests/orchestrator/workflows/helpers.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/helpers.py 
b/tests/orchestrator/workflows/helpers.py
new file mode 100644
index 0000000..8e3f9b1
--- /dev/null
+++ b/tests/orchestrator/workflows/helpers.py
@@ -0,0 +1,37 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from contextlib import contextmanager
+
+
+@contextmanager
+def events_collector(*signals):
+    handlers = {}
+    collected = {}
+
+    def handler_factory(key):
+        def handler(*args, **kwargs):
+            signal_events = collected.setdefault(key, [])
+            signal_events.append({'args': args, 'kwargs': kwargs})
+        handlers[signal] = handler
+        return handler
+
+    for signal in signals:
+        signal.connect(handler_factory(signal))
+    try:
+        yield collected
+    finally:
+        for signal in signals:
+            signal.disconnect(handlers[signal])

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/433d55ea/tests/resources/scripts/test_ssh.sh
----------------------------------------------------------------------
diff --git a/tests/resources/scripts/test_ssh.sh 
b/tests/resources/scripts/test_ssh.sh
new file mode 100644
index 0000000..6f18278
--- /dev/null
+++ b/tests/resources/scripts/test_ssh.sh
@@ -0,0 +1,81 @@
+#!/bin/bash
+
+set -u
+set -e
+
+test_run_script_basic() {
+    ctx node-instance runtime-properties test_value $test_value
+}
+
+test_run_script_as_sudo() {
+    mkdir -p /opt/test_dir
+}
+
+test_run_script_default_base_dir() {
+    ctx node-instance runtime-properties work_dir $PWD
+}
+
+test_run_script_with_hide() {
+    true
+}
+
+test_run_script_process_config() {
+    ctx node-instance runtime-properties env_value $test_value_env
+    ctx node-instance runtime-properties bash_version $BASH_VERSION
+    ctx node-instance runtime-properties arg1_value $1
+    ctx node-instance runtime-properties arg2_value $2
+    ctx node-instance runtime-properties cwd $PWD
+    ctx node-instance runtime-properties ctx_path $(which ctx)
+}
+
+test_run_script_command_prefix() {
+    ctx node-instance runtime-properties dollar_dash $-
+}
+
+test_run_script_reuse_existing_ctx_1() {
+    ctx node-instance runtime-properties test_value1 $test_value1
+}
+
+test_run_script_reuse_existing_ctx_2() {
+    ctx node-instance runtime-properties test_value2 $test_value2
+}
+
+test_run_script_download_resource_plain() {
+    local destination=$(mktemp)
+    ctx download-resource ${destination} test_resource
+    ctx node-instance runtime-properties test_value "$(cat ${destination})"
+}
+
+test_run_script_download_resource_and_render() {
+    local destination=$(mktemp)
+    ctx download-resource-and-render ${destination} test_resource
+    ctx node-instance runtime-properties test_value "$(cat ${destination})"
+}
+
+test_run_script_inputs_as_env_variables_no_override() {
+    ctx node-instance runtime-properties test_value "$custom_env_var"
+}
+
+test_run_script_inputs_as_env_variables_process_env_override() {
+    ctx node-instance runtime-properties test_value "$custom_env_var"
+}
+
+test_run_script_error_in_script() {
+    ctx property-that-does-not-exist
+}
+
+test_run_script_abort_immediate() {
+    ctx task abort abort-message
+}
+
+test_run_script_retry() {
+    ctx task retry retry-message
+}
+
+test_run_script_abort_error_ignored_by_script() {
+    set +e
+    ctx task abort abort-message
+}
+
+# Injected by test
+${test_operation} $@

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/433d55ea/tests/test_logger.py
----------------------------------------------------------------------
diff --git a/tests/test_logger.py b/tests/test_logger.py
index 0199068..70f08bb 100644
--- a/tests/test_logger.py
+++ b/tests/test_logger.py
@@ -109,7 +109,7 @@ def test_loggermixin(capsys):
 
     test_string = 'loggermixing_test_string'
 
-    create_logger(handlers=[create_console_log_handler()])
+    logger = create_logger(handlers=[create_console_log_handler()])
 
     custom_class = type('CustomClass', (LoggerMixin,), {}).with_logger()
     custom_class.logger.debug(test_string)
@@ -117,6 +117,9 @@ def test_loggermixin(capsys):
     _, err = capsys.readouterr()
     assert test_string in err
 
+    for handler in logger.handlers:
+        logger.removeHandler(handler)
+
     # TODO: figure out what up with pickle
     # class_pickled = pickle.dumps(custom_class)
     # class_unpickled = pickle.loads(class_pickled)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/433d55ea/tests/utils/test_exceptions.py
----------------------------------------------------------------------
diff --git a/tests/utils/test_exceptions.py b/tests/utils/test_exceptions.py
new file mode 100644
index 0000000..5d030e2
--- /dev/null
+++ b/tests/utils/test_exceptions.py
@@ -0,0 +1,73 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import jsonpickle
+
+from aria.utils import exceptions
+
+_ARG1 = 'arg-1'
+_ARG2 = 'arg-2'
+
+
+class TestWrapIfNeeded(object):
+
+    def test_no_wrapping_required1(self):
+        e = JsonPickleableException1(_ARG1, _ARG2)
+        assert exceptions.wrap_if_needed(e) is e
+
+    def test_no_wrapping_required2(self):
+        e = JsonPickleableException1(arg1=_ARG1, arg2=_ARG2)
+        assert exceptions.wrap_if_needed(e) is e
+
+    def test_no_wrapping_required3(self):
+        e = JsonPickleableException2(arg1=_ARG1, arg2=_ARG2)
+        assert exceptions.wrap_if_needed(e) is e
+
+    def test_wrapping_required1(self):
+        e = NonJsonPickleableException(_ARG1, _ARG2)
+        wrapped_e = exceptions.wrap_if_needed(e)
+        wrapped_e = jsonpickle.loads(jsonpickle.dumps(wrapped_e))
+        assert isinstance(wrapped_e, exceptions._WrappedException)
+        assert wrapped_e.exception_type == type(e).__name__
+        assert wrapped_e.exception_str == str(e)
+
+    def test_wrapping_required2(self):
+        e = NonJsonPickleableException(arg1=_ARG1, arg2=_ARG2)
+        wrapped_e = exceptions.wrap_if_needed(e)
+        wrapped_e = jsonpickle.loads(jsonpickle.dumps(wrapped_e))
+        assert isinstance(wrapped_e, exceptions._WrappedException)
+        assert wrapped_e.exception_type == type(e).__name__
+        assert wrapped_e.exception_str == str(e)
+
+
+class JsonPickleableException1(Exception):
+    def __init__(self, arg1, arg2):
+        super(JsonPickleableException1, self).__init__(arg1, arg2)
+        self.arg1 = arg1
+        self.arg2 = arg2
+
+
+class JsonPickleableException2(Exception):
+    def __init__(self, arg1=None, arg2=None):
+        super(JsonPickleableException2, self).__init__()
+        self.arg1 = arg1
+        self.arg2 = arg2
+
+
+class NonJsonPickleableException(Exception):
+    def __init__(self, arg1, arg2):
+        super(NonJsonPickleableException, self).__init__()
+        self.arg1 = arg1
+        self.arg2 = arg2

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/433d55ea/tox.ini
----------------------------------------------------------------------
diff --git a/tox.ini b/tox.ini
index 68f9ffa..fa4bd5c 100644
--- a/tox.ini
+++ b/tox.ini
@@ -14,6 +14,13 @@
 envlist=py27,py26,pywin,pylint_code,pylint_tests
 
 [testenv]
+passenv =
+    TRAVIS
+    PYTHON
+    PYTHON_VERSION
+    PYTHON_ARCH
+setenv =
+    INSTALL_CTX=1
 deps =
     -rrequirements.txt
     -rtests/requirements.txt

Reply via email to