ARIA-46 Execution plugin

Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/3e0a578d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/3e0a578d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/3e0a578d

Branch: refs/heads/ARIA-46-execution-plugin
Commit: 3e0a578d314f82c4425e2c7749614bbe3f4bdd5b
Parents: 385b209
Author: Dan Kilman <d...@gigaspaces.com>
Authored: Tue Jan 3 17:00:46 2017 +0200
Committer: Dan Kilman <d...@gigaspaces.com>
Committed: Thu Jan 19 14:15:56 2017 +0200

----------------------------------------------------------------------
 aria/orchestrator/context/common.py             |   4 +-
 aria/orchestrator/context/operation.py          |   5 +-
 aria/orchestrator/exceptions.py                 |   2 +-
 aria/orchestrator/execution_plugin/__init__.py  |  33 ++
 aria/orchestrator/execution_plugin/common.py    | 145 +++++
 aria/orchestrator/execution_plugin/constants.py |  51 ++
 .../execution_plugin/ctx_proxy/__init__.py      |  16 +
 .../execution_plugin/ctx_proxy/client.py        | 105 ++++
 .../execution_plugin/ctx_proxy/server.py        | 239 ++++++++
 .../execution_plugin/environment_globals.py     |  60 ++
 .../orchestrator/execution_plugin/exceptions.py |  36 ++
 aria/orchestrator/execution_plugin/local.py     | 125 ++++
 .../orchestrator/execution_plugin/operations.py |  63 ++
 .../execution_plugin/ssh/__init__.py            |  14 +
 .../execution_plugin/ssh/operations.py          | 192 ++++++
 .../orchestrator/execution_plugin/ssh/tunnel.py |  91 +++
 aria/orchestrator/workflows/api/task.py         |  12 +-
 aria/orchestrator/workflows/core/engine.py      |   1 -
 aria/orchestrator/workflows/core/task.py        |   3 +-
 aria/orchestrator/workflows/executor/process.py |   3 +-
 aria/storage/base_model.py                      |  24 +-
 aria/storage/filesystem_rapi.py                 |   2 +-
 aria/utils/exceptions.py                        |  19 +
 requirements.txt                                |   3 +
 setup.py                                        |   9 +-
 tests/orchestrator/execution_plugin/__init__.py |  14 +
 .../execution_plugin/test_common.py             | 191 ++++++
 .../execution_plugin/test_ctx_proxy_server.py   | 359 ++++++++++++
 .../execution_plugin/test_global_ctx.py         |  28 +
 .../orchestrator/execution_plugin/test_local.py | 587 +++++++++++++++++++
 tests/orchestrator/execution_plugin/test_ssh.py | 481 +++++++++++++++
 tests/orchestrator/workflows/api/test_task.py   |   5 +
 tests/orchestrator/workflows/core/test_task.py  |  38 +-
 tests/orchestrator/workflows/helpers.py         |  37 ++
 tests/resources/scripts/test_ssh.sh             |  81 +++
 tests/test_logger.py                            |   5 +-
 tests/utils/test_exceptions.py                  |  73 +++
 tox.ini                                         |   7 +
 38 files changed, 3137 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/aria/orchestrator/context/common.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/common.py 
b/aria/orchestrator/context/common.py
index 53844e8..691c17d 100644
--- a/aria/orchestrator/context/common.py
+++ b/aria/orchestrator/context/common.py
@@ -113,9 +113,7 @@ class BaseContext(logger.LoggerMixin):
         template using the provided variables. ctx is available to the 
template without providing it
         explicitly.
         """
-        self.download_resource(destination=destination, path=path)
-        with open(destination, 'rb') as f:
-            resource_content = f.read()
+        resource_content = self.get_resource(path=path)
         resource_content = 
self._render_resource(resource_content=resource_content,
                                                  variables=variables)
         with open(destination, 'wb') as f:

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/aria/orchestrator/context/operation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/operation.py 
b/aria/orchestrator/context/operation.py
index 19bb73a..b33d107 100644
--- a/aria/orchestrator/context/operation.py
+++ b/aria/orchestrator/context/operation.py
@@ -42,6 +42,7 @@ class BaseOperationContext(BaseContext):
             **kwargs)
         self._task_id = task_id
         self._actor_id = actor_id
+        self._task = None
 
     def __repr__(self):
         details = 'operation_mapping={task.operation_mapping}; ' \
@@ -55,7 +56,9 @@ class BaseOperationContext(BaseContext):
         The task in the model storage
         :return: Task model
         """
-        return self.model.task.get(self._task_id)
+        if not self._task:
+            self._task = self.model.task.get(self._task_id)
+        return self._task
 
 
 class NodeOperationContext(BaseOperationContext):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/aria/orchestrator/exceptions.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/exceptions.py b/aria/orchestrator/exceptions.py
index bd5238e..c00b66b 100644
--- a/aria/orchestrator/exceptions.py
+++ b/aria/orchestrator/exceptions.py
@@ -36,7 +36,7 @@ class TaskRetryException(RuntimeError):
     """
     Used internally when ctx.task.retry is called
     """
-    def __init__(self, message, retry_interval):
+    def __init__(self, message, retry_interval=None):
         super(TaskRetryException, self).__init__(message)
         self.retry_interval = retry_interval
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/aria/orchestrator/execution_plugin/__init__.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_plugin/__init__.py 
b/aria/orchestrator/execution_plugin/__init__.py
new file mode 100644
index 0000000..372022f
--- /dev/null
+++ b/aria/orchestrator/execution_plugin/__init__.py
@@ -0,0 +1,33 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from contextlib import contextmanager
+
+# Populated during execution of python scripts
+ctx = None
+inputs = None
+
+
+@contextmanager
+def python_script_scope(operation_ctx, operation_inputs):
+    global ctx
+    global inputs
+    try:
+        ctx = operation_ctx
+        inputs = operation_inputs
+        yield
+    finally:
+        ctx = None
+        inputs = None

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/aria/orchestrator/execution_plugin/common.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_plugin/common.py 
b/aria/orchestrator/execution_plugin/common.py
new file mode 100644
index 0000000..a64a955
--- /dev/null
+++ b/aria/orchestrator/execution_plugin/common.py
@@ -0,0 +1,145 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import os
+import tempfile
+
+import requests
+
+from . import constants
+from . import exceptions
+
+
+def is_windows():
+    return os.name == 'nt'
+
+
+def download_script(ctx, script_path):
+    split = script_path.split('://')
+    schema = split[0]
+    suffix = script_path.split('/')[-1]
+    file_descriptor, dest_script_path = 
tempfile.mkstemp(suffix='-{0}'.format(suffix))
+    os.close(file_descriptor)
+    try:
+        if schema in ['http', 'https']:
+            response = requests.get(script_path)
+            if response.status_code == 404:
+                ctx.task.abort('Failed to download script: {0} (status code: 
{1})'
+                               .format(script_path, response.status_code))
+            content = response.text
+            with open(dest_script_path, 'wb') as f:
+                f.write(content)
+        else:
+            ctx.download_resource(destination=dest_script_path, 
path=script_path)
+    except:
+        os.remove(dest_script_path)
+        raise
+    return dest_script_path
+
+
+def create_process_config(script_path, process, operation_kwargs, 
quote_json_env_vars=False):
+    """
+    update a process with it's environment variables, and return it.
+
+    Get a dict representing a process and a dict representing the environment
+    variables. Convert each environment variable to a format of
+    <string representing the name of the variable> :
+    <json formatted string representing the value of the variable>.
+    Finally, update the process with the newly formatted environment variables,
+    and return the process.
+
+    :param process: a dict representing a process
+    :type process: dict
+    :param operation_kwargs: a dict representing environment variables that
+    should exist in the process' running environment.
+    :type operation_kwargs: dict
+    :return: the process updated with its environment variables.
+    :rtype: dict
+    """
+    process = process or {}
+    env_vars = operation_kwargs.copy()
+    if 'ctx' in env_vars:
+        del env_vars['ctx']
+    env_vars.update(process.get('env', {}))
+    for k, v in env_vars.items():
+        if isinstance(v, (dict, list, tuple, bool, int, float)):
+            env_var_value = json.dumps(v)
+            if is_windows():
+                # These <k,v> environment variables will subsequently
+                # be used in a subprocess.Popen() call, as the `env` parameter.
+                # In some windows python versions, if an environment variable
+                # name is not of type str (e.g. unicode), the Popen call will
+                # fail.
+                k = str(k)
+                # The windows shell removes all double quotes - escape them
+                # to still be able to pass JSON in env vars to the shell.
+                env_var_value = env_var_value.replace('"', '\\"')
+            if quote_json_env_vars:
+                env_var_value = "'{0}'".format(env_var_value)
+            env_vars[k] = env_var_value
+    process['env'] = env_vars
+    args = process.get('args')
+    command = script_path
+    command_prefix = process.get('command_prefix')
+    if command_prefix:
+        command = '{0} {1}'.format(command_prefix, command)
+    if args:
+        command = ' '.join([command] + [str(a) for a in args])
+    process['command'] = command
+    return process
+
+
+def patch_ctx(ctx):
+    ctx._error = None
+    task = ctx.task
+    task._original_abort = task.abort
+    task._original_retry = task.retry
+
+    def _validate_legal_action():
+        if ctx._error is not None:
+            ctx._error = RuntimeError(constants.ILLEGAL_CTX_OPERATION_MESSAGE)
+            raise ctx._error
+
+    def abort_operation(message=None):
+        _validate_legal_action()
+        ctx._error = exceptions.ScriptException(message=message, retry=False)
+        return ctx._error
+    task.abort = abort_operation
+
+    def retry_operation(message=None, retry_interval=None):
+        _validate_legal_action()
+        ctx._error = exceptions.ScriptException(message=message,
+                                                retry=True,
+                                                retry_interval=retry_interval)
+        return ctx._error
+    task.retry = retry_operation
+
+
+def check_error(ctx, error_check_func=None, reraise=False):
+    _error = ctx._error
+    # this happens when more than 1 ctx return action is invoked
+    if isinstance(_error, RuntimeError):
+        ctx.task._original_abort(str(_error))
+    elif isinstance(_error, exceptions.ScriptException):
+        if _error.retry:
+            ctx.task._original_retry(_error.message, _error.retry_interval)
+        else:
+            ctx.task._original_abort(_error.message)
+    if error_check_func:
+        error_check_func()
+    if reraise:
+        raise  # pylint: disable=misplaced-bare-raise
+    return _error

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/aria/orchestrator/execution_plugin/constants.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_plugin/constants.py 
b/aria/orchestrator/execution_plugin/constants.py
new file mode 100644
index 0000000..0b54dbe
--- /dev/null
+++ b/aria/orchestrator/execution_plugin/constants.py
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from . import exceptions
+
+# related to local
+PYTHON_SCRIPT_FILE_EXTENSION = '.py'
+POWERSHELL_SCRIPT_FILE_EXTENSION = '.ps1'
+DEFAULT_POWERSHELL_EXECUTABLE = 'powershell'
+
+# related to both local and ssh
+ILLEGAL_CTX_OPERATION_MESSAGE = 'ctx may only abort or retry once'
+
+# related to ssh
+DEFAULT_BASE_DIR = '/tmp/aria-ctx'
+FABRIC_ENV_DEFAULTS = {
+    'connection_attempts': 5,
+    'timeout': 10,
+    'forward_agent': False,
+    'abort_on_prompts': True,
+    'keepalive': 0,
+    'linewise': False,
+    'pool_size': 0,
+    'skip_bad_hosts': False,
+    'status': False,
+    'disable_known_hosts': True,
+    'combine_stderr': True,
+    'abort_exception': exceptions.TaskException,
+}
+VALID_FABRIC_GROUPS = set([
+    'status',
+    'aborts',
+    'warnings',
+    'running',
+    'stdout',
+    'stderr',
+    'user',
+    'everything'
+])

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/aria/orchestrator/execution_plugin/ctx_proxy/__init__.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_plugin/ctx_proxy/__init__.py 
b/aria/orchestrator/execution_plugin/ctx_proxy/__init__.py
new file mode 100644
index 0000000..7571c15
--- /dev/null
+++ b/aria/orchestrator/execution_plugin/ctx_proxy/__init__.py
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from . import server, client

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/aria/orchestrator/execution_plugin/ctx_proxy/client.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_plugin/ctx_proxy/client.py 
b/aria/orchestrator/execution_plugin/ctx_proxy/client.py
new file mode 100644
index 0000000..d965a5e
--- /dev/null
+++ b/aria/orchestrator/execution_plugin/ctx_proxy/client.py
@@ -0,0 +1,105 @@
+#! /usr/bin/env python
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import argparse
+import json
+import os
+import sys
+import urllib2
+
+
+# Environment variable for the socket url (used by clients to locate the 
socket)
+CTX_SOCKET_URL = 'CTX_SOCKET_URL'
+
+
+class _RequestError(RuntimeError):
+
+    def __init__(self, ex_message, ex_type, ex_traceback):
+        super(_RequestError, self).__init__(self, '{0}: {1}'.format(ex_type, 
ex_message))
+        self.ex_type = ex_type
+        self.ex_message = ex_message
+        self.ex_traceback = ex_traceback
+
+
+def _http_request(socket_url, request, timeout):
+    response = urllib2.urlopen(
+        url=socket_url,
+        data=json.dumps(request),
+        timeout=timeout)
+    if response.code != 200:
+        raise RuntimeError('Request failed: {0}'.format(response))
+    return json.loads(response.read())
+
+
+def _client_request(socket_url, args, timeout):
+    response = _http_request(
+        socket_url=socket_url,
+        request={'args': args},
+        timeout=timeout)
+    payload = response['payload']
+    response_type = response.get('type')
+    if response_type == 'error':
+        ex_type = payload['type']
+        ex_message = payload['message']
+        ex_traceback = payload['traceback']
+        raise _RequestError(ex_message, ex_type, ex_traceback)
+    elif response_type == 'stop_operation':
+        raise SystemExit(payload['message'])
+    else:
+        return payload
+
+
+def _parse_args(args):
+    parser = argparse.ArgumentParser()
+    parser.add_argument('-t', '--timeout', type=int, default=30)
+    parser.add_argument('--socket-url', default=os.environ.get(CTX_SOCKET_URL))
+    parser.add_argument('--json-arg-prefix', default='@')
+    parser.add_argument('-j', '--json-output', action='store_true')
+    parser.add_argument('args', nargs='*')
+    args = parser.parse_args(args=args)
+    if not args.socket_url:
+        raise RuntimeError('Missing CTX_SOCKET_URL environment variable '
+                           'or socket_url command line argument. (ctx is 
supposed to be executed '
+                           'within an operation context)')
+    return args
+
+
+def _process_args(json_prefix, args):
+    processed_args = []
+    for arg in args:
+        if arg.startswith(json_prefix):
+            arg = json.loads(arg[1:])
+        processed_args.append(arg)
+    return processed_args
+
+
+def main(args=None):
+    args = _parse_args(args)
+    response = _client_request(
+        socket_url=args.socket_url,
+        args=_process_args(args.json_arg_prefix, args.args),
+        timeout=args.timeout)
+    if args.json_output:
+        response = json.dumps(response)
+    else:
+        if not response:
+            response = ''
+        response = str(response)
+    sys.stdout.write(response)
+
+
+if __name__ == '__main__':
+    main()

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/aria/orchestrator/execution_plugin/ctx_proxy/server.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_plugin/ctx_proxy/server.py 
b/aria/orchestrator/execution_plugin/ctx_proxy/server.py
new file mode 100644
index 0000000..2782ae3
--- /dev/null
+++ b/aria/orchestrator/execution_plugin/ctx_proxy/server.py
@@ -0,0 +1,239 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import collections
+import json
+import re
+import socket
+import threading
+import traceback
+import Queue
+import StringIO
+import wsgiref.simple_server
+
+import bottle
+
+from .. import exceptions
+
+
+class CtxProxy(object):
+
+    def __init__(self, ctx):
+        self.ctx = ctx
+        self.port = _get_unused_port()
+        self.socket_url = 'http://localhost:{0}'.format(self.port)
+        self.server = None
+        self._started = Queue.Queue(1)
+        self.thread = self._start_server()
+        self._started.get(timeout=5)
+
+    def _start_server(self):
+        proxy = self
+
+        class BottleServerAdapter(bottle.ServerAdapter):
+            def run(self, app):
+                class Server(wsgiref.simple_server.WSGIServer):
+                    allow_reuse_address = True
+
+                    def handle_error(self, request, client_address):
+                        pass
+
+                class Handler(wsgiref.simple_server.WSGIRequestHandler):
+                    def address_string(self):
+                        return self.client_address[0]
+
+                    def log_request(*args, **kwargs):  # pylint: 
disable=no-method-argument
+                        if not self.quiet:
+                            return 
wsgiref.simple_server.WSGIRequestHandler.log_request(*args,
+                                                                               
         **kwargs)
+                server = wsgiref.simple_server.make_server(
+                    host=self.host,
+                    port=self.port,
+                    app=app,
+                    server_class=Server,
+                    handler_class=Handler)
+                proxy.server = server
+                proxy._started.put(True)
+                server.serve_forever(poll_interval=0.1)
+
+        def serve():
+            bottle_app = bottle.Bottle()
+            bottle_app.post('/', callback=self._request_handler)
+            bottle.run(
+                app=bottle_app,
+                host='localhost',
+                port=self.port,
+                quiet=True,
+                server=BottleServerAdapter)
+        thread = threading.Thread(target=serve)
+        thread.daemon = True
+        thread.start()
+        return thread
+
+    def close(self):
+        if self.server:
+            self.server.shutdown()
+            self.server.server_close()
+
+    def _request_handler(self):
+        request = bottle.request.body.read()  # pylint: disable=no-member
+        response = self._process(request)
+        return bottle.LocalResponse(
+            body=response,
+            status=200,
+            headers={'content-type': 'application/json'})
+
+    def _process(self, request):
+        try:
+            typed_request = json.loads(request)
+            args = typed_request['args']
+            payload = _process_ctx_request(self.ctx, args)
+            result_type = 'result'
+            if isinstance(payload, exceptions.ScriptException):
+                payload = dict(message=str(payload))
+                result_type = 'stop_operation'
+            result = json.dumps({
+                'type': result_type,
+                'payload': payload
+            })
+        except Exception as e:
+            traceback_out = StringIO.StringIO()
+            traceback.print_exc(file=traceback_out)
+            payload = {
+                'type': type(e).__name__,
+                'message': str(e),
+                'traceback': traceback_out.getvalue()
+            }
+            result = json.dumps({
+                'type': 'error',
+                'payload': payload
+            })
+        return result
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, *args, **kwargs):
+        self.close()
+
+
+def _process_ctx_request(ctx, args):
+    current = ctx
+    num_args = len(args)
+    index = 0
+    while index < num_args:
+        arg = args[index]
+        attr = _desugar_attr(current, arg)
+        if attr:
+            current = getattr(current, attr)
+        elif isinstance(current, collections.MutableMapping):
+            key = arg
+            path_dict = _PathDictAccess(current)
+            if index + 1 == num_args:
+                # read dict prop by path
+                value = path_dict.get(key)
+                current = value
+            elif index + 2 == num_args:
+                # set dict prop by path
+                value = args[index + 1]
+                path_dict.set(key, value)
+                current = None
+            else:
+                raise RuntimeError('Illegal argument while accessing dict')
+            break
+        elif callable(current):
+            kwargs = {}
+            remaining_args = args[index:]
+            if isinstance(remaining_args[-1], collections.MutableMapping):
+                kwargs = remaining_args[-1]
+                remaining_args = remaining_args[:-1]
+            current = current(*remaining_args, **kwargs)
+            break
+        else:
+            raise RuntimeError('{0} cannot be processed in {1}'.format(arg, 
args))
+        index += 1
+    if callable(current):
+        current = current()
+    return current
+
+
+def _desugar_attr(obj, attr):
+    if not isinstance(attr, basestring):
+        return None
+    if hasattr(obj, attr):
+        return attr
+    attr = attr.replace('-', '_')
+    if hasattr(obj, attr):
+        return attr
+    return None
+
+
+class _PathDictAccess(object):
+    pattern = re.compile(r"(.+)\[(\d+)\]")
+
+    def __init__(self, obj):
+        self.obj = obj
+
+    def set(self, prop_path, value):
+        obj, prop_name = self._get_parent_obj_prop_name_by_path(prop_path)
+        obj[prop_name] = value
+
+    def get(self, prop_path):
+        value = self._get_object_by_path(prop_path)
+        return value
+
+    def _get_object_by_path(self, prop_path, fail_on_missing=True):
+        # when setting a nested object, make sure to also set all the
+        # intermediate path objects
+        current = self.obj
+        for prop_segment in prop_path.split('.'):
+            match = self.pattern.match(prop_segment)
+            if match:
+                index = int(match.group(2))
+                property_name = match.group(1)
+                if property_name not in current:
+                    self._raise_illegal(prop_path)
+                if not isinstance(current[property_name], list):
+                    self._raise_illegal(prop_path)
+                current = current[property_name][index]
+            else:
+                if prop_segment not in current:
+                    if fail_on_missing:
+                        self._raise_illegal(prop_path)
+                    else:
+                        current[prop_segment] = {}
+                current = current[prop_segment]
+        return current
+
+    def _get_parent_obj_prop_name_by_path(self, prop_path):
+        split = prop_path.split('.')
+        if len(split) == 1:
+            return self.obj, prop_path
+        parent_path = '.'.join(split[:-1])
+        parent_obj = self._get_object_by_path(parent_path, 
fail_on_missing=False)
+        prop_name = split[-1]
+        return parent_obj, prop_name
+
+    @staticmethod
+    def _raise_illegal(prop_path):
+        raise RuntimeError('illegal path: {0}'.format(prop_path))
+
+
+def _get_unused_port():
+    sock = socket.socket()
+    sock.bind(('127.0.0.1', 0))
+    _, port = sock.getsockname()
+    sock.close()
+    return port

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/aria/orchestrator/execution_plugin/environment_globals.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_plugin/environment_globals.py 
b/aria/orchestrator/execution_plugin/environment_globals.py
new file mode 100644
index 0000000..27311f0
--- /dev/null
+++ b/aria/orchestrator/execution_plugin/environment_globals.py
@@ -0,0 +1,60 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+def create_initial_globals(path):
+    """ emulates a `globals()` call in a freshly loaded module
+
+    The implementation of this function is likely to raise a couple of
+    questions. If you read the implementation and nothing bothered you, feel
+    free to skip the rest of this docstring.
+
+    First, why is this function in its own module and not, say, in the same
+    module of the other environment-related functions?
+    Second, why is it implemented in such a way that copies the globals, then
+    deletes the item that represents this function, and then changes some
+    other entries?
+
+    Well, these two questions can be answered with one (elaborate) explanation.
+    If this function was in the same module with the other environment-related
+    functions, then we would have had to delete more items in globals than just
+    `create_initial_globals`. That is because all of the other function names
+    would also be in globals, and since there is no built-in mechanism that
+    return the name of the user-defined objects, this approach is quite an
+    overkill.
+
+    - But why do we rely on the copy-existing-globals-and-delete-entries
+    method, when it seems to force us to put `create_initial_globals` in its
+    own file?
+
+    Well, because there is no easier method of creating globals of a newly
+    loaded module.
+
+    - How about hard coding a 'global' dict? It seems that there are very few
+    entries: __doc__, __file__, __name__, __package__ (but don't forget
+    __builtins__).
+
+    That would be coupling our implementation to a specific `globals`
+    implementation. What if `globals` were to change?
+    """
+    copied_globals = globals().copy()
+    copied_globals.update({
+        '__doc__': 'Dynamically executed script',
+        '__file__': path,
+        '__name__': '__main__',
+        '__package__': None
+    })
+    del copied_globals[create_initial_globals.__name__]
+    return copied_globals

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/aria/orchestrator/execution_plugin/exceptions.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_plugin/exceptions.py 
b/aria/orchestrator/execution_plugin/exceptions.py
new file mode 100644
index 0000000..22fa9a9
--- /dev/null
+++ b/aria/orchestrator/execution_plugin/exceptions.py
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+class ProcessException(Exception):
+
+    def __init__(self, stderr=None, stdout=None, command=None, exit_code=None):
+        super(ProcessException, self).__init__(stderr)
+        self.command = command
+        self.exit_code = exit_code
+        self.stdout = stdout
+        self.stderr = stderr
+
+
+class TaskException(Exception):
+    pass
+
+
+class ScriptException(Exception):
+
+    def __init__(self, message=None, retry=None, retry_interval=None):
+        super(ScriptException, self).__init__(message)
+        self.retry = retry
+        self.retry_interval = retry_interval

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/aria/orchestrator/execution_plugin/local.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_plugin/local.py 
b/aria/orchestrator/execution_plugin/local.py
new file mode 100644
index 0000000..bc2d661
--- /dev/null
+++ b/aria/orchestrator/execution_plugin/local.py
@@ -0,0 +1,125 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import subprocess
+import threading
+import StringIO
+
+from . import ctx_proxy
+from . import exceptions
+from . import common
+from . import constants
+from . import environment_globals
+from . import python_script_scope
+
+
+def run_script(ctx, script_path, process, **kwargs):
+    if not script_path:
+        ctx.task.abort('Missing script_path')
+    process = process or {}
+    script_path = common.download_script(ctx, script_path)
+    script_func = _get_run_script_func(script_path, process)
+    return script_func(
+        ctx=ctx,
+        script_path=script_path,
+        process=process,
+        operation_kwargs=kwargs)
+
+
+def _get_run_script_func(script_path, process):
+    if _treat_script_as_python_script(script_path, process):
+        return _eval_script_func
+    else:
+        if _treat_script_as_powershell_script(script_path):
+            process.setdefault('command_prefix', 
constants.DEFAULT_POWERSHELL_EXECUTABLE)
+        return _execute_func
+
+
+def _treat_script_as_python_script(script_path, process):
+    eval_python = process.get('eval_python')
+    script_extension = os.path.splitext(script_path)[1].lower()
+    return (eval_python is True or (script_extension == 
constants.PYTHON_SCRIPT_FILE_EXTENSION and
+                                    eval_python is not False))
+
+
+def _treat_script_as_powershell_script(script_path):
+    script_extension = os.path.splitext(script_path)[1].lower()
+    return script_extension == constants.POWERSHELL_SCRIPT_FILE_EXTENSION
+
+
+def _eval_script_func(script_path, ctx, operation_kwargs, **_):
+    with python_script_scope(operation_ctx=ctx, 
operation_inputs=operation_kwargs):
+        execfile(script_path, 
environment_globals.create_initial_globals(script_path))
+
+
+def _execute_func(script_path, ctx, process, operation_kwargs):
+    os.chmod(script_path, 0755)
+    process = common.create_process_config(
+        script_path=script_path,
+        process=process,
+        operation_kwargs=operation_kwargs)
+    command = process['command']
+    env = os.environ.copy()
+    env.update(process['env'])
+    ctx.logger.info('Executing: {0}'.format(command))
+    common.patch_ctx(ctx)
+    with ctx_proxy.server.CtxProxy(ctx) as proxy:
+        env[ctx_proxy.client.CTX_SOCKET_URL] = proxy.socket_url
+        running_process = subprocess.Popen(
+            command,
+            shell=True,
+            stdout=subprocess.PIPE,
+            stderr=subprocess.PIPE,
+            env=env,
+            cwd=process.get('cwd'),
+            bufsize=1,
+            close_fds=not common.is_windows())
+        stdout_consumer = _OutputConsumer(running_process.stdout)
+        stderr_consumer = _OutputConsumer(running_process.stderr)
+        exit_code = running_process.wait()
+    stdout_consumer.join()
+    stderr_consumer.join()
+    ctx.logger.info('Execution done (exit_code={0}): {1}'.format(exit_code, 
command))
+
+    def error_check_func():
+        if exit_code:
+            raise exceptions.ProcessException(
+                command=command,
+                exit_code=exit_code,
+                stdout=stdout_consumer.read_output(),
+                stderr=stderr_consumer.read_output())
+    return common.check_error(ctx, error_check_func=error_check_func)
+
+
+class _OutputConsumer(object):
+
+    def __init__(self, out):
+        self._out = out
+        self._buffer = StringIO.StringIO()
+        self._consumer = threading.Thread(target=self._consume_output)
+        self._consumer.daemon = True
+        self._consumer.start()
+
+    def _consume_output(self):
+        for line in iter(self._out.readline, b''):
+            self._buffer.write(line)
+        self._out.close()
+
+    def read_output(self):
+        return self._buffer.getvalue()
+
+    def join(self):
+        self._consumer.join()

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/aria/orchestrator/execution_plugin/operations.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_plugin/operations.py 
b/aria/orchestrator/execution_plugin/operations.py
new file mode 100644
index 0000000..5effa8a
--- /dev/null
+++ b/aria/orchestrator/execution_plugin/operations.py
@@ -0,0 +1,63 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from aria.orchestrator import operation
+from . import local as local_operations
+from .ssh import operations as ssh_operations
+
+
+@operation
+def run_script_locally(ctx,
+                       script_path,
+                       process=None,
+                       **kwargs):
+    return local_operations.run_script(
+        ctx=ctx,
+        script_path=script_path,
+        process=process,
+        **kwargs)
+
+
+@operation
+def run_script_with_ssh(ctx,
+                        script_path,
+                        fabric_env=None,
+                        process=None,
+                        use_sudo=False,
+                        hide_output=None,
+                        **kwargs):
+    return ssh_operations.run_script(
+        ctx=ctx,
+        script_path=script_path,
+        fabric_env=fabric_env,
+        process=process,
+        use_sudo=use_sudo,
+        hide_output=hide_output,
+        **kwargs)
+
+
+@operation
+def run_commands_with_ssh(ctx,
+                          commands,
+                          fabric_env=None,
+                          use_sudo=False,
+                          hide_output=None,
+                          **_):
+    return ssh_operations.run_commands(
+        ctx=ctx,
+        commands=commands,
+        fabric_env=fabric_env,
+        use_sudo=use_sudo,
+        hide_output=hide_output)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/aria/orchestrator/execution_plugin/ssh/__init__.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_plugin/ssh/__init__.py 
b/aria/orchestrator/execution_plugin/ssh/__init__.py
new file mode 100644
index 0000000..ae1e83e
--- /dev/null
+++ b/aria/orchestrator/execution_plugin/ssh/__init__.py
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/aria/orchestrator/execution_plugin/ssh/operations.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_plugin/ssh/operations.py 
b/aria/orchestrator/execution_plugin/ssh/operations.py
new file mode 100644
index 0000000..7589d42
--- /dev/null
+++ b/aria/orchestrator/execution_plugin/ssh/operations.py
@@ -0,0 +1,192 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import random
+import string
+import tempfile
+import StringIO
+
+import fabric.api
+import fabric.context_managers
+import fabric.contrib.files
+
+from .. import constants
+from .. import exceptions
+from .. import common
+from .. import ctx_proxy
+from . import tunnel
+
+
+_PROXY_CLIENT_PATH = ctx_proxy.client.__file__
+if _PROXY_CLIENT_PATH.endswith('.pyc'):
+    _PROXY_CLIENT_PATH = _PROXY_CLIENT_PATH[:-1]
+
+
+def run_commands(ctx, commands, fabric_env, use_sudo, hide_output, **_):
+    """Runs the provider 'commands' in sequence
+
+    :param commands: a list of commands to run
+    :param fabric_env: fabric configuration
+    """
+    with fabric.api.settings(_hide_output(ctx, groups=hide_output),
+                             **_fabric_env(ctx, fabric_env, warn_only=True)):
+        for command in commands:
+            ctx.logger.info('Running command: {0}'.format(command))
+            run = fabric.api.sudo if use_sudo else fabric.api.run
+            result = run(command)
+            if result.failed:
+                raise exceptions.ProcessException(
+                    command=result.command,
+                    exit_code=result.return_code,
+                    stdout=result.stdout,
+                    stderr=result.stderr)
+
+
+def run_script(ctx, script_path, fabric_env, process, use_sudo, hide_output, 
**kwargs):
+    process = process or {}
+    paths = _Paths(base_dir=process.get('base_dir', 
constants.DEFAULT_BASE_DIR),
+                   local_script_path=common.download_script(ctx, script_path))
+    with fabric.api.settings(_hide_output(ctx, groups=hide_output),
+                             **_fabric_env(ctx, fabric_env, warn_only=False)):
+        # the remote host must have the ctx before running any fabric scripts
+        if not fabric.contrib.files.exists(paths.remote_ctx_path):
+            # there may be race conditions with other operations that
+            # may be running in parallel, so we pass -p to make sure
+            # we get 0 exit code if the directory already exists
+            fabric.api.run('mkdir -p {0} && mkdir -p 
{1}'.format(paths.remote_scripts_dir,
+                                                                 
paths.remote_work_dir))
+            # this file has to be present before using ctx
+            fabric.api.put(_PROXY_CLIENT_PATH, paths.remote_ctx_path)
+        _patch_ctx(ctx)
+        process = common.create_process_config(
+            script_path=paths.remote_script_path,
+            process=process,
+            operation_kwargs=kwargs,
+            quote_json_env_vars=True)
+        fabric.api.put(paths.local_script_path, paths.remote_script_path)
+        with ctx_proxy.server.CtxProxy(ctx) as proxy:
+            local_port = proxy.port
+            with fabric.context_managers.cd(process.get('cwd', 
paths.remote_work_dir)):  # pylint: disable=not-context-manager
+                with tunnel.remote(ctx, local_port=local_port) as remote_port:
+                    local_socket_url = proxy.socket_url
+                    remote_socket_url = 
local_socket_url.replace(str(local_port), str(remote_port))
+                    env_script = _write_environment_script_file(
+                        process=process,
+                        paths=paths,
+                        local_socket_url=local_socket_url,
+                        remote_socket_url=remote_socket_url)
+                    fabric.api.put(env_script, paths.remote_env_script_path)
+                    try:
+                        command = 'source {0} && 
{1}'.format(paths.remote_env_script_path,
+                                                             
process['command'])
+                        run = fabric.api.sudo if use_sudo else fabric.api.run
+                        run(command)
+                    except exceptions.TaskException:
+                        return common.check_error(ctx, reraise=True)
+            return common.check_error(ctx)
+
+
+def _patch_ctx(ctx):
+    common.patch_ctx(ctx)
+    original_download_resource = ctx.download_resource
+    original_download_resource_and_render = ctx.download_resource_and_render
+
+    def _download_resource(func, destination, **kwargs):
+        handle, temp_local_path = tempfile.mkstemp()
+        os.close(handle)
+        try:
+            func(destination=temp_local_path, **kwargs)
+            return fabric.api.put(temp_local_path, destination)
+        finally:
+            os.remove(temp_local_path)
+
+    def download_resource(destination, path=None):
+        _download_resource(
+            func=original_download_resource,
+            destination=destination,
+            path=path)
+    ctx.download_resource = download_resource
+
+    def download_resource_and_render(destination, path=None, variables=None):
+        _download_resource(
+            func=original_download_resource_and_render,
+            destination=destination,
+            path=path,
+            variables=variables)
+    ctx.download_resource_and_render = download_resource_and_render
+
+
+def _hide_output(ctx, groups):
+    """ Hides Fabric's output for every 'entity' in `groups` """
+    groups = set(groups or [])
+    if not groups.issubset(constants.VALID_FABRIC_GROUPS):
+        ctx.task.abort('`hide_output` must be a subset of {0} (Provided: {1})'
+                       .format(', '.join(constants.VALID_FABRIC_GROUPS), ', 
'.join(groups)))
+    return fabric.api.hide(*groups)
+
+
+def _fabric_env(ctx, fabric_env, warn_only):
+    """Prepares fabric environment variables configuration"""
+    ctx.logger.debug('Preparing fabric environment...')
+    env = constants.FABRIC_ENV_DEFAULTS.copy()
+    env.update(fabric_env or {})
+    env.setdefault('warn_only', warn_only)
+    if 'host_string' not in env:
+        env['host_string'] = ctx.task.runs_on.ip
+    # validations
+    if not env.get('host_string'):
+        ctx.task.abort('`host_string` not supplied and ip cannot be deduced 
automatically')
+    if not (env.get('password') or env.get('key_filename') or env.get('key')):
+        ctx.task.abort(
+            'Access credentials not supplied '
+            '(you must supply at least one of `key_filename`, `key` or 
`password`)')
+    if not env.get('user'):
+        ctx.task.abort('`user` not supplied')
+    ctx.logger.debug('Environment prepared successfully')
+    return env
+
+
+def _write_environment_script_file(process, paths, local_socket_url, 
remote_socket_url):
+    env_script = StringIO.StringIO()
+    env = process['env']
+    env['PATH'] = '{0}:$PATH'.format(paths.remote_ctx_dir)
+    env['PYTHONPATH'] = '{0}:$PYTHONPATH'.format(paths.remote_ctx_dir)
+    env_script.write('chmod +x {0}\n'.format(paths.remote_script_path))
+    env_script.write('chmod +x {0}\n'.format(paths.remote_ctx_path))
+    env.update({
+        ctx_proxy.client.CTX_SOCKET_URL: remote_socket_url,
+        'LOCAL_{0}'.format(ctx_proxy.client.CTX_SOCKET_URL): local_socket_url
+    })
+    for key, value in env.iteritems():
+        env_script.write('export {0}={1}\n'.format(key, value))
+    return env_script
+
+
+class _Paths(object):
+
+    def __init__(self, base_dir, local_script_path):
+        self.local_script_path = local_script_path
+        self.remote_ctx_dir = base_dir
+        self.base_script_path = os.path.basename(self.local_script_path)
+        self.remote_ctx_path = '{0}/ctx'.format(self.remote_ctx_dir)
+        self.remote_scripts_dir = '{0}/scripts'.format(self.remote_ctx_dir)
+        self.remote_work_dir = '{0}/work'.format(self.remote_ctx_dir)
+        random_suffix = ''.join(random.choice(string.ascii_lowercase + 
string.digits)
+                                for _ in range(8))
+        remote_path_suffix = '{0}-{1}'.format(self.base_script_path, 
random_suffix)
+        self.remote_env_script_path = 
'{0}/env-{1}'.format(self.remote_scripts_dir,
+                                                           remote_path_suffix)
+        self.remote_script_path = '{0}/{1}'.format(self.remote_scripts_dir, 
remote_path_suffix)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/aria/orchestrator/execution_plugin/ssh/tunnel.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_plugin/ssh/tunnel.py 
b/aria/orchestrator/execution_plugin/ssh/tunnel.py
new file mode 100644
index 0000000..6fc8d54
--- /dev/null
+++ b/aria/orchestrator/execution_plugin/ssh/tunnel.py
@@ -0,0 +1,91 @@
+# This implementation was copied from the Fabric project directly:
+# https://github.com/fabric/fabric/blob/master/fabric/context_managers.py#L486
+# The purpose was to remove the rtunnel creation printouts here:
+# https://github.com/fabric/fabric/blob/master/fabric/context_managers.py#L547
+
+
+import contextlib
+import select
+import socket
+
+import fabric.api
+import fabric.state
+import fabric.thread_handling
+
+
+@contextlib.contextmanager
+def remote(ctx, local_port, remote_port=0, local_host='localhost', 
remote_bind_address='127.0.0.1'):
+    """Create a tunnel forwarding a locally-visible port to the remote 
target."""
+    sockets = []
+    channels = []
+    thread_handlers = []
+
+    def accept(channel, *args, **kwargs):
+        # This seemingly innocent statement seems to be doing nothing
+        # but the truth is far from it!
+        # calling fileno() on a paramiko channel the first time, creates
+        # the required plumbing to make the channel valid for select.
+        # While this would generally happen implicitly inside the _forwarder
+        # function when select is called, it may already be too late and may
+        # cause the select loop to hang.
+        # Specifically, when new data arrives to the channel, a flag is set
+        # on an "event" object which is what makes the select call work.
+        # problem is this will only happen if the event object is not None
+        # and it will be not-None only after channel.fileno() has been called
+        # for the first time. If we wait until _forwarder calls select for the
+        # first time it may be after initial data has reached the channel.
+        # calling it explicitly here in the paramiko transport main event loop
+        # guarantees this will not happen.
+        channel.fileno()
+
+        channels.append(channel)
+        sock = socket.socket()
+        sockets.append(sock)
+
+        try:
+            sock.connect((local_host, local_port))
+        except Exception as e:
+            try:
+                channel.close()
+            except Exception as ex2:
+                close_error = ' (While trying to close channel: 
{0})'.format(ex2)
+            else:
+                close_error = ''
+            ctx.task.abort('[{0}] rtunnel: cannot connect to {1}:{2} ({3}){4}'
+                           .format(fabric.api.env.host_string, local_host, 
local_port, e,
+                                   close_error))
+
+        thread_handler = fabric.thread_handling.ThreadHandler('fwd', 
_forwarder, channel, sock)
+        thread_handlers.append(thread_handler)
+
+    transport = 
fabric.state.connections[fabric.api.env.host_string].get_transport()
+    remote_port = transport.request_port_forward(
+        remote_bind_address, remote_port, handler=accept)
+
+    try:
+        yield remote_port
+    finally:
+        for sock, chan, thread_handler in zip(sockets, channels, 
thread_handlers):
+            sock.close()
+            chan.close()
+            thread_handler.thread.join()
+            thread_handler.raise_if_needed()
+        transport.cancel_port_forward(remote_bind_address, remote_port)
+
+
+def _forwarder(chan, sock):
+    # Bidirectionally forward data between a socket and a Paramiko channel.
+    while True:
+        read = select.select([sock, chan], [], [])[0]
+        if sock in read:
+            data = sock.recv(1024)
+            if len(data) == 0:
+                break
+            chan.send(data)
+        if chan in read:
+            data = chan.recv(1024)
+            if len(data) == 0:
+                break
+            sock.send(data)
+    chan.close()
+    sock.close()

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/aria/orchestrator/workflows/api/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/api/task.py 
b/aria/orchestrator/workflows/api/task.py
index 70324a6..8d93837 100644
--- a/aria/orchestrator/workflows/api/task.py
+++ b/aria/orchestrator/workflows/api/task.py
@@ -68,7 +68,8 @@ class OperationTask(BaseTask):
                  retry_interval=None,
                  ignore_failure=None,
                  inputs=None,
-                 plugin=None):
+                 plugin=None,
+                 runs_on=None):
         """
         Creates an operation task using the name, details, node instance and 
any additional kwargs.
         :param name: the operation of the name.
@@ -89,6 +90,7 @@ class OperationTask(BaseTask):
                                if retry_interval is None else retry_interval)
         self.ignore_failure = (self.workflow_context._task_ignore_failure
                                if ignore_failure is None else ignore_failure)
+        self.runs_on = runs_on
 
     @classmethod
     def node_instance(cls, instance, name, inputs=None, *args, **kwargs):
@@ -104,6 +106,7 @@ class OperationTask(BaseTask):
                              operation_details=instance.node.operations[name],
                              inputs=inputs,
                              plugins=instance.node.plugins or [],
+                             runs_on=model.Task.RUNS_ON_NODE,
                              *args,
                              **kwargs)
 
@@ -126,18 +129,22 @@ class OperationTask(BaseTask):
         operation_details = getattr(instance.relationship, operation_end)[name]
         if operation_end == cls.SOURCE_OPERATION:
             plugins = instance.relationship.source_node.plugins
+            runs_on = model.Task.RUNS_ON_SOURCE
         else:
             plugins = instance.relationship.target_node.plugins
+            runs_on = model.Task.RUNS_ON_TARGET
         return cls._instance(instance=instance,
                              name=name,
                              operation_details=operation_details,
                              inputs=inputs,
                              plugins=plugins or [],
+                             runs_on=runs_on,
                              *args,
                              **kwargs)
 
     @classmethod
-    def _instance(cls, instance, name, operation_details, inputs, plugins, 
*args, **kwargs):
+    def _instance(cls, instance, name, operation_details, inputs, plugins, 
runs_on, *args,
+                  **kwargs):
         operation_mapping = operation_details.get('operation')
         operation_inputs = operation_details.get('inputs', {})
         operation_inputs.update(inputs or {})
@@ -151,6 +158,7 @@ class OperationTask(BaseTask):
                    operation_mapping=operation_mapping,
                    inputs=operation_inputs,
                    plugin=plugin,
+                   runs_on=runs_on,
                    *args,
                    **kwargs)
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py 
b/aria/orchestrator/workflows/core/engine.py
index 47269a3..fd83614 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -52,7 +52,6 @@ class Engine(logger.LoggerMixin):
         """
         try:
             events.start_workflow_signal.send(self._workflow_context)
-            cancel = False
             while True:
                 cancel = self._is_cancel()
                 if cancel:

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/aria/orchestrator/workflows/core/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/task.py 
b/aria/orchestrator/workflows/core/task.py
index 67be2ea..1deb66a 100644
--- a/aria/orchestrator/workflows/core/task.py
+++ b/aria/orchestrator/workflows/core/task.py
@@ -135,7 +135,8 @@ class OperationTask(BaseTask):
             retry_interval=api_task.retry_interval,
             ignore_failure=api_task.ignore_failure,
             plugin=plugins[0] if plugins else None,
-            execution=self._workflow_context.execution
+            execution=self._workflow_context.execution,
+            runs_on=api_task.runs_on
         )
         self._workflow_context.model.task.put(operation_task)
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py 
b/aria/orchestrator/workflows/executor/process.py
index 51596c3..2cc9178 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -40,6 +40,7 @@ import Queue
 import jsonpickle
 
 from aria.utils import imports
+from aria.utils import exceptions
 from aria.orchestrator.workflows.executor import base
 from aria.orchestrator.context import serialization
 from aria.storage import instrumentation
@@ -255,7 +256,7 @@ class _Messenger(object):
             data = jsonpickle.dumps({
                 'type': type,
                 'task_id': self.task_id,
-                'exception': exception,
+                'exception': exceptions.wrap_if_needed(exception),
                 'tracked_changes': tracked_changes
             })
             sock.send(struct.pack(_INT_FMT, len(data)))

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/aria/storage/base_model.py
----------------------------------------------------------------------
diff --git a/aria/storage/base_model.py b/aria/storage/base_model.py
index a2bbb60..29973a2 100644
--- a/aria/storage/base_model.py
+++ b/aria/storage/base_model.py
@@ -695,6 +695,11 @@ class TaskBase(ModelMixin):
     WAIT_STATES = [PENDING, RETRYING]
     END_STATES = [SUCCESS, FAILED]
 
+    RUNS_ON_SOURCE = 'source'
+    RUNS_ON_TARGET = 'target'
+    RUNS_ON_NODE = 'node'
+    RUNS_ON = (RUNS_ON_NODE, RUNS_ON_SOURCE, RUNS_ON_TARGET)
+
     @orm.validates('max_attempts')
     def validate_max_attempts(self, _, value):                                 
 # pylint: disable=no-self-use
         """Validates that max attempts is either -1 or a positive number"""
@@ -718,6 +723,7 @@ class TaskBase(ModelMixin):
     # Operation specific fields
     operation_mapping = Column(String)
     inputs = Column(Dict)
+    _runs_on = Column(Enum(*RUNS_ON, name='runs_on'), name='runs_on')
 
     @property
     def actor(self):
@@ -727,13 +733,23 @@ class TaskBase(ModelMixin):
         """
         return self.node_instance or self.relationship_instance
 
+    @property
+    def runs_on(self):
+        if self._runs_on == self.RUNS_ON_NODE:
+            return self.node_instance
+        elif self._runs_on == self.RUNS_ON_SOURCE:
+            return self.relationship_instance.source_node_instance  # pylint: 
disable=no-member
+        elif self._runs_on == self.RUNS_ON_TARGET:
+            return self.relationship_instance.target_node_instance  # pylint: 
disable=no-member
+        return None
+
     @classmethod
-    def as_node_instance(cls, instance, **kwargs):
-        return cls(node_instance=instance, **kwargs)
+    def as_node_instance(cls, instance, runs_on, **kwargs):
+        return cls(node_instance=instance, _runs_on=runs_on, **kwargs)
 
     @classmethod
-    def as_relationship_instance(cls, instance, **kwargs):
-        return cls(relationship_instance=instance, **kwargs)
+    def as_relationship_instance(cls, instance, runs_on, **kwargs):
+        return cls(relationship_instance=instance, _runs_on=runs_on, **kwargs)
 
     @staticmethod
     def abort(message=None):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/aria/storage/filesystem_rapi.py
----------------------------------------------------------------------
diff --git a/aria/storage/filesystem_rapi.py b/aria/storage/filesystem_rapi.py
index c6b3a81..eb30e0b 100644
--- a/aria/storage/filesystem_rapi.py
+++ b/aria/storage/filesystem_rapi.py
@@ -132,7 +132,7 @@ class FileSystemResourceAPI(api.ResourceAPI):
         if os.path.isfile(resource):
             shutil.copy2(resource, destination)
         else:
-            dir_util.copy_tree(resource, destination)                          
           # pylint: disable=no-member
+            dir_util.copy_tree(resource, destination)  # pylint: 
disable=no-member
 
     def upload(self, entry_id, source, path=None, **_):
         """

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/aria/utils/exceptions.py
----------------------------------------------------------------------
diff --git a/aria/utils/exceptions.py b/aria/utils/exceptions.py
index 0370bb3..a19eb78 100644
--- a/aria/utils/exceptions.py
+++ b/aria/utils/exceptions.py
@@ -16,6 +16,8 @@
 import sys
 import linecache
 
+import jsonpickle
+
 from clint.textui import indent
 from .console import (puts, Colored)
 
@@ -39,6 +41,7 @@ def print_exception(e, full=True, cause=False, 
traceback=None):
         traceback = e.cause_traceback if hasattr(e, 'cause_traceback') else 
None
         print_exception(e.cause, full=full, cause=True, traceback=traceback)
 
+
 def print_traceback(traceback=None):
     """
     Prints the traceback with nice colors and such.
@@ -62,3 +65,19 @@ def print_traceback(traceback=None):
                 with indent(2):
                     puts(Colored.black(line.strip()))
         traceback = traceback.tb_next
+
+
+class _WrappedException(Exception):
+
+    def __init__(self, exception_type, exception_str):
+        super(_WrappedException, self).__init__(exception_type, exception_str)
+        self.exception_type = exception_type
+        self.exception_str = exception_str
+
+
+def wrap_if_needed(exception):
+    try:
+        jsonpickle.loads(jsonpickle.dumps(exception))
+        return exception
+    except BaseException:
+        return _WrappedException(type(exception).__name__, str(exception))

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/requirements.txt
----------------------------------------------------------------------
diff --git a/requirements.txt b/requirements.txt
index 0005a5e..055bafb 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -26,3 +26,6 @@ CacheControl[filecache]==0.11.6
 clint==0.5.1
 SQLAlchemy==1.1.4
 wagon==0.5.0
+bottle==0.12.11
+six==1.10.0
+Fabric==1.13.1

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/setup.py
----------------------------------------------------------------------
diff --git a/setup.py b/setup.py
index 112f13e..5a3d016 100644
--- a/setup.py
+++ b/setup.py
@@ -44,6 +44,11 @@ except IOError:
     install_requires = []
 
 
+console_scripts = ['aria = aria.cli.cli:main']
+if os.environ.get('INSTALL_CTX'):
+    console_scripts.append('ctx = 
aria.orchestrator.execution_plugin.ctx_proxy.client:main')
+
+
 setup(
     name=_PACKAGE_NAME,
     version=version,
@@ -77,8 +82,6 @@ setup(
     zip_safe=False,
     install_requires=install_requires,
     entry_points={
-        'console_scripts': [
-            'aria = aria.cli.cli:main'
-        ]
+        'console_scripts': console_scripts
     }
 )

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/tests/orchestrator/execution_plugin/__init__.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/__init__.py 
b/tests/orchestrator/execution_plugin/__init__.py
new file mode 100644
index 0000000..ae1e83e
--- /dev/null
+++ b/tests/orchestrator/execution_plugin/__init__.py
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/tests/orchestrator/execution_plugin/test_common.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_common.py 
b/tests/orchestrator/execution_plugin/test_common.py
new file mode 100644
index 0000000..4e2f714
--- /dev/null
+++ b/tests/orchestrator/execution_plugin/test_common.py
@@ -0,0 +1,191 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from collections import namedtuple
+
+import requests
+import pytest
+
+from aria.storage import model
+from aria.orchestrator import exceptions
+from aria.orchestrator.execution_plugin import common
+
+
+class TestDownloadScript(object):
+
+    @pytest.fixture(autouse=True)
+    def patch_requests(self, mocker):
+        def _mock_requests_get(url):
+            response = namedtuple('Response', 'text status_code')
+            return response(url, self.status_code)
+        self.status_code = 200
+        mocker.patch.object(requests, 'get', _mock_requests_get)
+
+    def _test_url(self, url):
+        class Ctx(object):
+            task = model.Task
+
+        script_path = url
+        result = common.download_script(Ctx, script_path)
+        with open(result) as f:
+            assert script_path == f.read()
+        assert result.endswith('-some_script.py')
+
+    def test_http_url(self):
+        self._test_url('http://localhost/some_script.py')
+
+    def test_https_url(self):
+        self._test_url('https://localhost/some_script.py')
+
+    def test_url_status_code_404(self):
+        self.status_code = 404
+        with pytest.raises(exceptions.TaskAbortException) as exc_ctx:
+            self.test_http_url()
+        exception = exc_ctx.value
+        assert 'status code: 404' in str(exception)
+
+    def test_blueprint_resource(self):
+        test_script_path = 'my_script.py'
+
+        class Ctx(object):
+            @staticmethod
+            def download_resource(destination, path):
+                assert path == test_script_path
+                return destination
+        result = common.download_script(Ctx, test_script_path)
+        assert result.endswith(test_script_path)
+
+
+class TestCreateProcessConfig(object):
+
+    def test_plain_command(self):
+        script_path = 'path'
+        process = common.create_process_config(
+            script_path=script_path,
+            process={},
+            operation_kwargs={})
+        assert process['command'] == script_path
+
+    def test_command_with_args(self):
+        script_path = 'path'
+        process = {'args': [1, 2, 3]}
+        process = common.create_process_config(
+            script_path=script_path,
+            process=process,
+            operation_kwargs={})
+        assert process['command'] == '{0} 1 2 3'.format(script_path)
+
+    def test_command_prefix(self):
+        script_path = 'path'
+        command_prefix = 'prefix'
+        process = {'command_prefix': command_prefix}
+        process = common.create_process_config(
+            script_path=script_path,
+            process=process,
+            operation_kwargs={})
+        assert process['command'] == '{0} {1}'.format(command_prefix, 
script_path)
+
+    def test_command_with_args_and_prefix(self):
+        script_path = 'path'
+        command_prefix = 'prefix'
+        process = {'command_prefix': command_prefix,
+                   'args': [1, 2, 3]}
+        process = common.create_process_config(
+            script_path=script_path,
+            process=process,
+            operation_kwargs={})
+        assert process['command'] == '{0} {1} 1 2 3'.format(command_prefix, 
script_path)
+
+    def test_ctx_is_removed(self):
+        process = common.create_process_config(
+            script_path='',
+            process={},
+            operation_kwargs={'ctx': 1})
+        assert 'ctx' not in process['env']
+
+    def test_env_passed_explicitly(self):
+        env = {'one': '1', 'two': '2'}
+        process = common.create_process_config(
+            script_path='',
+            process={'env': env},
+            operation_kwargs={})
+        assert process['env'] == env
+
+    def test_env_populated_from_operation_kwargs(self):
+        operation_kwargs = {'one': '1', 'two': '2'}
+        process = common.create_process_config(
+            script_path='',
+            process={},
+            operation_kwargs=operation_kwargs)
+        assert process['env'] == operation_kwargs
+
+    def test_env_merged_from_operation_kwargs_and_process(self):
+        operation_kwargs = {'one': '1', 'two': '2'}
+        env = {'three': '3', 'four': '4'}
+        process = common.create_process_config(
+            script_path='',
+            process={'env': env},
+            operation_kwargs=operation_kwargs)
+        assert process['env'] == dict(operation_kwargs.items() + env.items())
+
+    def test_process_env_gets_precedence_over_operation_kwargs(self):
+        operation_kwargs = {'one': 'from_kwargs'}
+        env = {'one': 'from_env_process'}
+        process = common.create_process_config(
+            script_path='',
+            process={'env': env},
+            operation_kwargs=operation_kwargs)
+        assert process['env'] == env
+
+    def test_json_env_vars(self):
+        operation_kwargs = {'a_dict': {'key': 'value'},
+                            'a_list': ['a', 'b', 'c'],
+                            'a_tuple': (4, 5, 6),
+                            'a_bool': True}
+        process = common.create_process_config(
+            script_path='',
+            process={},
+            operation_kwargs=operation_kwargs)
+        assert process['env'] == {'a_dict': '{"key": "value"}',
+                                  'a_list': '["a", "b", "c"]',
+                                  'a_tuple': '[4, 5, 6]',
+                                  'a_bool': 'true'}
+
+    def test_quote_json_env_vars(self):
+        operation_kwargs = {'one': []}
+        process = common.create_process_config(
+            script_path='',
+            process={},
+            operation_kwargs=operation_kwargs,
+            quote_json_env_vars=True)
+        assert process['env']['one'] == "'[]'"
+
+    @pytest.mark.skipif(not common.is_windows(), reason='windows')
+    def test_env_keys_converted_to_string_on_windows(self):
+        env = {u'one': '1'}
+        process = common.create_process_config(
+            script_path='',
+            process={'env': env},
+            operation_kwargs={})
+        assert isinstance(process['env'].keys()[0], str)
+
+    @pytest.mark.skipif(not common.is_windows(), reason='windows')
+    def test_env_values_quotes_are_escaped_on_windows(self):
+        env = {'one': '"hello"'}
+        process = common.create_process_config(
+            script_path='',
+            process={'env': env},
+            operation_kwargs={})
+        assert process['env']['one'] == '\\"hello\\"'

Reply via email to