ARIA-46 Execution plugin
Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/3e0a578d Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/3e0a578d Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/3e0a578d Branch: refs/heads/ARIA-46-execution-plugin Commit: 3e0a578d314f82c4425e2c7749614bbe3f4bdd5b Parents: 385b209 Author: Dan Kilman <d...@gigaspaces.com> Authored: Tue Jan 3 17:00:46 2017 +0200 Committer: Dan Kilman <d...@gigaspaces.com> Committed: Thu Jan 19 14:15:56 2017 +0200 ---------------------------------------------------------------------- aria/orchestrator/context/common.py | 4 +- aria/orchestrator/context/operation.py | 5 +- aria/orchestrator/exceptions.py | 2 +- aria/orchestrator/execution_plugin/__init__.py | 33 ++ aria/orchestrator/execution_plugin/common.py | 145 +++++ aria/orchestrator/execution_plugin/constants.py | 51 ++ .../execution_plugin/ctx_proxy/__init__.py | 16 + .../execution_plugin/ctx_proxy/client.py | 105 ++++ .../execution_plugin/ctx_proxy/server.py | 239 ++++++++ .../execution_plugin/environment_globals.py | 60 ++ .../orchestrator/execution_plugin/exceptions.py | 36 ++ aria/orchestrator/execution_plugin/local.py | 125 ++++ .../orchestrator/execution_plugin/operations.py | 63 ++ .../execution_plugin/ssh/__init__.py | 14 + .../execution_plugin/ssh/operations.py | 192 ++++++ .../orchestrator/execution_plugin/ssh/tunnel.py | 91 +++ aria/orchestrator/workflows/api/task.py | 12 +- aria/orchestrator/workflows/core/engine.py | 1 - aria/orchestrator/workflows/core/task.py | 3 +- aria/orchestrator/workflows/executor/process.py | 3 +- aria/storage/base_model.py | 24 +- aria/storage/filesystem_rapi.py | 2 +- aria/utils/exceptions.py | 19 + requirements.txt | 3 + setup.py | 9 +- tests/orchestrator/execution_plugin/__init__.py | 14 + .../execution_plugin/test_common.py | 191 ++++++ .../execution_plugin/test_ctx_proxy_server.py | 359 ++++++++++++ .../execution_plugin/test_global_ctx.py | 28 + .../orchestrator/execution_plugin/test_local.py | 587 +++++++++++++++++++ tests/orchestrator/execution_plugin/test_ssh.py | 481 +++++++++++++++ tests/orchestrator/workflows/api/test_task.py | 5 + tests/orchestrator/workflows/core/test_task.py | 38 +- tests/orchestrator/workflows/helpers.py | 37 ++ tests/resources/scripts/test_ssh.sh | 81 +++ tests/test_logger.py | 5 +- tests/utils/test_exceptions.py | 73 +++ tox.ini | 7 + 38 files changed, 3137 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/aria/orchestrator/context/common.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py index 53844e8..691c17d 100644 --- a/aria/orchestrator/context/common.py +++ b/aria/orchestrator/context/common.py @@ -113,9 +113,7 @@ class BaseContext(logger.LoggerMixin): template using the provided variables. ctx is available to the template without providing it explicitly. """ - self.download_resource(destination=destination, path=path) - with open(destination, 'rb') as f: - resource_content = f.read() + resource_content = self.get_resource(path=path) resource_content = self._render_resource(resource_content=resource_content, variables=variables) with open(destination, 'wb') as f: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/aria/orchestrator/context/operation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py index 19bb73a..b33d107 100644 --- a/aria/orchestrator/context/operation.py +++ b/aria/orchestrator/context/operation.py @@ -42,6 +42,7 @@ class BaseOperationContext(BaseContext): **kwargs) self._task_id = task_id self._actor_id = actor_id + self._task = None def __repr__(self): details = 'operation_mapping={task.operation_mapping}; ' \ @@ -55,7 +56,9 @@ class BaseOperationContext(BaseContext): The task in the model storage :return: Task model """ - return self.model.task.get(self._task_id) + if not self._task: + self._task = self.model.task.get(self._task_id) + return self._task class NodeOperationContext(BaseOperationContext): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/aria/orchestrator/exceptions.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/exceptions.py b/aria/orchestrator/exceptions.py index bd5238e..c00b66b 100644 --- a/aria/orchestrator/exceptions.py +++ b/aria/orchestrator/exceptions.py @@ -36,7 +36,7 @@ class TaskRetryException(RuntimeError): """ Used internally when ctx.task.retry is called """ - def __init__(self, message, retry_interval): + def __init__(self, message, retry_interval=None): super(TaskRetryException, self).__init__(message) self.retry_interval = retry_interval http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/aria/orchestrator/execution_plugin/__init__.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/__init__.py b/aria/orchestrator/execution_plugin/__init__.py new file mode 100644 index 0000000..372022f --- /dev/null +++ b/aria/orchestrator/execution_plugin/__init__.py @@ -0,0 +1,33 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from contextlib import contextmanager + +# Populated during execution of python scripts +ctx = None +inputs = None + + +@contextmanager +def python_script_scope(operation_ctx, operation_inputs): + global ctx + global inputs + try: + ctx = operation_ctx + inputs = operation_inputs + yield + finally: + ctx = None + inputs = None http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/aria/orchestrator/execution_plugin/common.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/common.py b/aria/orchestrator/execution_plugin/common.py new file mode 100644 index 0000000..a64a955 --- /dev/null +++ b/aria/orchestrator/execution_plugin/common.py @@ -0,0 +1,145 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os +import tempfile + +import requests + +from . import constants +from . import exceptions + + +def is_windows(): + return os.name == 'nt' + + +def download_script(ctx, script_path): + split = script_path.split('://') + schema = split[0] + suffix = script_path.split('/')[-1] + file_descriptor, dest_script_path = tempfile.mkstemp(suffix='-{0}'.format(suffix)) + os.close(file_descriptor) + try: + if schema in ['http', 'https']: + response = requests.get(script_path) + if response.status_code == 404: + ctx.task.abort('Failed to download script: {0} (status code: {1})' + .format(script_path, response.status_code)) + content = response.text + with open(dest_script_path, 'wb') as f: + f.write(content) + else: + ctx.download_resource(destination=dest_script_path, path=script_path) + except: + os.remove(dest_script_path) + raise + return dest_script_path + + +def create_process_config(script_path, process, operation_kwargs, quote_json_env_vars=False): + """ + update a process with it's environment variables, and return it. + + Get a dict representing a process and a dict representing the environment + variables. Convert each environment variable to a format of + <string representing the name of the variable> : + <json formatted string representing the value of the variable>. + Finally, update the process with the newly formatted environment variables, + and return the process. + + :param process: a dict representing a process + :type process: dict + :param operation_kwargs: a dict representing environment variables that + should exist in the process' running environment. + :type operation_kwargs: dict + :return: the process updated with its environment variables. + :rtype: dict + """ + process = process or {} + env_vars = operation_kwargs.copy() + if 'ctx' in env_vars: + del env_vars['ctx'] + env_vars.update(process.get('env', {})) + for k, v in env_vars.items(): + if isinstance(v, (dict, list, tuple, bool, int, float)): + env_var_value = json.dumps(v) + if is_windows(): + # These <k,v> environment variables will subsequently + # be used in a subprocess.Popen() call, as the `env` parameter. + # In some windows python versions, if an environment variable + # name is not of type str (e.g. unicode), the Popen call will + # fail. + k = str(k) + # The windows shell removes all double quotes - escape them + # to still be able to pass JSON in env vars to the shell. + env_var_value = env_var_value.replace('"', '\\"') + if quote_json_env_vars: + env_var_value = "'{0}'".format(env_var_value) + env_vars[k] = env_var_value + process['env'] = env_vars + args = process.get('args') + command = script_path + command_prefix = process.get('command_prefix') + if command_prefix: + command = '{0} {1}'.format(command_prefix, command) + if args: + command = ' '.join([command] + [str(a) for a in args]) + process['command'] = command + return process + + +def patch_ctx(ctx): + ctx._error = None + task = ctx.task + task._original_abort = task.abort + task._original_retry = task.retry + + def _validate_legal_action(): + if ctx._error is not None: + ctx._error = RuntimeError(constants.ILLEGAL_CTX_OPERATION_MESSAGE) + raise ctx._error + + def abort_operation(message=None): + _validate_legal_action() + ctx._error = exceptions.ScriptException(message=message, retry=False) + return ctx._error + task.abort = abort_operation + + def retry_operation(message=None, retry_interval=None): + _validate_legal_action() + ctx._error = exceptions.ScriptException(message=message, + retry=True, + retry_interval=retry_interval) + return ctx._error + task.retry = retry_operation + + +def check_error(ctx, error_check_func=None, reraise=False): + _error = ctx._error + # this happens when more than 1 ctx return action is invoked + if isinstance(_error, RuntimeError): + ctx.task._original_abort(str(_error)) + elif isinstance(_error, exceptions.ScriptException): + if _error.retry: + ctx.task._original_retry(_error.message, _error.retry_interval) + else: + ctx.task._original_abort(_error.message) + if error_check_func: + error_check_func() + if reraise: + raise # pylint: disable=misplaced-bare-raise + return _error http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/aria/orchestrator/execution_plugin/constants.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/constants.py b/aria/orchestrator/execution_plugin/constants.py new file mode 100644 index 0000000..0b54dbe --- /dev/null +++ b/aria/orchestrator/execution_plugin/constants.py @@ -0,0 +1,51 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from . import exceptions + +# related to local +PYTHON_SCRIPT_FILE_EXTENSION = '.py' +POWERSHELL_SCRIPT_FILE_EXTENSION = '.ps1' +DEFAULT_POWERSHELL_EXECUTABLE = 'powershell' + +# related to both local and ssh +ILLEGAL_CTX_OPERATION_MESSAGE = 'ctx may only abort or retry once' + +# related to ssh +DEFAULT_BASE_DIR = '/tmp/aria-ctx' +FABRIC_ENV_DEFAULTS = { + 'connection_attempts': 5, + 'timeout': 10, + 'forward_agent': False, + 'abort_on_prompts': True, + 'keepalive': 0, + 'linewise': False, + 'pool_size': 0, + 'skip_bad_hosts': False, + 'status': False, + 'disable_known_hosts': True, + 'combine_stderr': True, + 'abort_exception': exceptions.TaskException, +} +VALID_FABRIC_GROUPS = set([ + 'status', + 'aborts', + 'warnings', + 'running', + 'stdout', + 'stderr', + 'user', + 'everything' +]) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/aria/orchestrator/execution_plugin/ctx_proxy/__init__.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/ctx_proxy/__init__.py b/aria/orchestrator/execution_plugin/ctx_proxy/__init__.py new file mode 100644 index 0000000..7571c15 --- /dev/null +++ b/aria/orchestrator/execution_plugin/ctx_proxy/__init__.py @@ -0,0 +1,16 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from . import server, client http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/aria/orchestrator/execution_plugin/ctx_proxy/client.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/ctx_proxy/client.py b/aria/orchestrator/execution_plugin/ctx_proxy/client.py new file mode 100644 index 0000000..d965a5e --- /dev/null +++ b/aria/orchestrator/execution_plugin/ctx_proxy/client.py @@ -0,0 +1,105 @@ +#! /usr/bin/env python +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import argparse +import json +import os +import sys +import urllib2 + + +# Environment variable for the socket url (used by clients to locate the socket) +CTX_SOCKET_URL = 'CTX_SOCKET_URL' + + +class _RequestError(RuntimeError): + + def __init__(self, ex_message, ex_type, ex_traceback): + super(_RequestError, self).__init__(self, '{0}: {1}'.format(ex_type, ex_message)) + self.ex_type = ex_type + self.ex_message = ex_message + self.ex_traceback = ex_traceback + + +def _http_request(socket_url, request, timeout): + response = urllib2.urlopen( + url=socket_url, + data=json.dumps(request), + timeout=timeout) + if response.code != 200: + raise RuntimeError('Request failed: {0}'.format(response)) + return json.loads(response.read()) + + +def _client_request(socket_url, args, timeout): + response = _http_request( + socket_url=socket_url, + request={'args': args}, + timeout=timeout) + payload = response['payload'] + response_type = response.get('type') + if response_type == 'error': + ex_type = payload['type'] + ex_message = payload['message'] + ex_traceback = payload['traceback'] + raise _RequestError(ex_message, ex_type, ex_traceback) + elif response_type == 'stop_operation': + raise SystemExit(payload['message']) + else: + return payload + + +def _parse_args(args): + parser = argparse.ArgumentParser() + parser.add_argument('-t', '--timeout', type=int, default=30) + parser.add_argument('--socket-url', default=os.environ.get(CTX_SOCKET_URL)) + parser.add_argument('--json-arg-prefix', default='@') + parser.add_argument('-j', '--json-output', action='store_true') + parser.add_argument('args', nargs='*') + args = parser.parse_args(args=args) + if not args.socket_url: + raise RuntimeError('Missing CTX_SOCKET_URL environment variable ' + 'or socket_url command line argument. (ctx is supposed to be executed ' + 'within an operation context)') + return args + + +def _process_args(json_prefix, args): + processed_args = [] + for arg in args: + if arg.startswith(json_prefix): + arg = json.loads(arg[1:]) + processed_args.append(arg) + return processed_args + + +def main(args=None): + args = _parse_args(args) + response = _client_request( + socket_url=args.socket_url, + args=_process_args(args.json_arg_prefix, args.args), + timeout=args.timeout) + if args.json_output: + response = json.dumps(response) + else: + if not response: + response = '' + response = str(response) + sys.stdout.write(response) + + +if __name__ == '__main__': + main() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/aria/orchestrator/execution_plugin/ctx_proxy/server.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/ctx_proxy/server.py b/aria/orchestrator/execution_plugin/ctx_proxy/server.py new file mode 100644 index 0000000..2782ae3 --- /dev/null +++ b/aria/orchestrator/execution_plugin/ctx_proxy/server.py @@ -0,0 +1,239 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections +import json +import re +import socket +import threading +import traceback +import Queue +import StringIO +import wsgiref.simple_server + +import bottle + +from .. import exceptions + + +class CtxProxy(object): + + def __init__(self, ctx): + self.ctx = ctx + self.port = _get_unused_port() + self.socket_url = 'http://localhost:{0}'.format(self.port) + self.server = None + self._started = Queue.Queue(1) + self.thread = self._start_server() + self._started.get(timeout=5) + + def _start_server(self): + proxy = self + + class BottleServerAdapter(bottle.ServerAdapter): + def run(self, app): + class Server(wsgiref.simple_server.WSGIServer): + allow_reuse_address = True + + def handle_error(self, request, client_address): + pass + + class Handler(wsgiref.simple_server.WSGIRequestHandler): + def address_string(self): + return self.client_address[0] + + def log_request(*args, **kwargs): # pylint: disable=no-method-argument + if not self.quiet: + return wsgiref.simple_server.WSGIRequestHandler.log_request(*args, + **kwargs) + server = wsgiref.simple_server.make_server( + host=self.host, + port=self.port, + app=app, + server_class=Server, + handler_class=Handler) + proxy.server = server + proxy._started.put(True) + server.serve_forever(poll_interval=0.1) + + def serve(): + bottle_app = bottle.Bottle() + bottle_app.post('/', callback=self._request_handler) + bottle.run( + app=bottle_app, + host='localhost', + port=self.port, + quiet=True, + server=BottleServerAdapter) + thread = threading.Thread(target=serve) + thread.daemon = True + thread.start() + return thread + + def close(self): + if self.server: + self.server.shutdown() + self.server.server_close() + + def _request_handler(self): + request = bottle.request.body.read() # pylint: disable=no-member + response = self._process(request) + return bottle.LocalResponse( + body=response, + status=200, + headers={'content-type': 'application/json'}) + + def _process(self, request): + try: + typed_request = json.loads(request) + args = typed_request['args'] + payload = _process_ctx_request(self.ctx, args) + result_type = 'result' + if isinstance(payload, exceptions.ScriptException): + payload = dict(message=str(payload)) + result_type = 'stop_operation' + result = json.dumps({ + 'type': result_type, + 'payload': payload + }) + except Exception as e: + traceback_out = StringIO.StringIO() + traceback.print_exc(file=traceback_out) + payload = { + 'type': type(e).__name__, + 'message': str(e), + 'traceback': traceback_out.getvalue() + } + result = json.dumps({ + 'type': 'error', + 'payload': payload + }) + return result + + def __enter__(self): + return self + + def __exit__(self, *args, **kwargs): + self.close() + + +def _process_ctx_request(ctx, args): + current = ctx + num_args = len(args) + index = 0 + while index < num_args: + arg = args[index] + attr = _desugar_attr(current, arg) + if attr: + current = getattr(current, attr) + elif isinstance(current, collections.MutableMapping): + key = arg + path_dict = _PathDictAccess(current) + if index + 1 == num_args: + # read dict prop by path + value = path_dict.get(key) + current = value + elif index + 2 == num_args: + # set dict prop by path + value = args[index + 1] + path_dict.set(key, value) + current = None + else: + raise RuntimeError('Illegal argument while accessing dict') + break + elif callable(current): + kwargs = {} + remaining_args = args[index:] + if isinstance(remaining_args[-1], collections.MutableMapping): + kwargs = remaining_args[-1] + remaining_args = remaining_args[:-1] + current = current(*remaining_args, **kwargs) + break + else: + raise RuntimeError('{0} cannot be processed in {1}'.format(arg, args)) + index += 1 + if callable(current): + current = current() + return current + + +def _desugar_attr(obj, attr): + if not isinstance(attr, basestring): + return None + if hasattr(obj, attr): + return attr + attr = attr.replace('-', '_') + if hasattr(obj, attr): + return attr + return None + + +class _PathDictAccess(object): + pattern = re.compile(r"(.+)\[(\d+)\]") + + def __init__(self, obj): + self.obj = obj + + def set(self, prop_path, value): + obj, prop_name = self._get_parent_obj_prop_name_by_path(prop_path) + obj[prop_name] = value + + def get(self, prop_path): + value = self._get_object_by_path(prop_path) + return value + + def _get_object_by_path(self, prop_path, fail_on_missing=True): + # when setting a nested object, make sure to also set all the + # intermediate path objects + current = self.obj + for prop_segment in prop_path.split('.'): + match = self.pattern.match(prop_segment) + if match: + index = int(match.group(2)) + property_name = match.group(1) + if property_name not in current: + self._raise_illegal(prop_path) + if not isinstance(current[property_name], list): + self._raise_illegal(prop_path) + current = current[property_name][index] + else: + if prop_segment not in current: + if fail_on_missing: + self._raise_illegal(prop_path) + else: + current[prop_segment] = {} + current = current[prop_segment] + return current + + def _get_parent_obj_prop_name_by_path(self, prop_path): + split = prop_path.split('.') + if len(split) == 1: + return self.obj, prop_path + parent_path = '.'.join(split[:-1]) + parent_obj = self._get_object_by_path(parent_path, fail_on_missing=False) + prop_name = split[-1] + return parent_obj, prop_name + + @staticmethod + def _raise_illegal(prop_path): + raise RuntimeError('illegal path: {0}'.format(prop_path)) + + +def _get_unused_port(): + sock = socket.socket() + sock.bind(('127.0.0.1', 0)) + _, port = sock.getsockname() + sock.close() + return port http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/aria/orchestrator/execution_plugin/environment_globals.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/environment_globals.py b/aria/orchestrator/execution_plugin/environment_globals.py new file mode 100644 index 0000000..27311f0 --- /dev/null +++ b/aria/orchestrator/execution_plugin/environment_globals.py @@ -0,0 +1,60 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def create_initial_globals(path): + """ emulates a `globals()` call in a freshly loaded module + + The implementation of this function is likely to raise a couple of + questions. If you read the implementation and nothing bothered you, feel + free to skip the rest of this docstring. + + First, why is this function in its own module and not, say, in the same + module of the other environment-related functions? + Second, why is it implemented in such a way that copies the globals, then + deletes the item that represents this function, and then changes some + other entries? + + Well, these two questions can be answered with one (elaborate) explanation. + If this function was in the same module with the other environment-related + functions, then we would have had to delete more items in globals than just + `create_initial_globals`. That is because all of the other function names + would also be in globals, and since there is no built-in mechanism that + return the name of the user-defined objects, this approach is quite an + overkill. + + - But why do we rely on the copy-existing-globals-and-delete-entries + method, when it seems to force us to put `create_initial_globals` in its + own file? + + Well, because there is no easier method of creating globals of a newly + loaded module. + + - How about hard coding a 'global' dict? It seems that there are very few + entries: __doc__, __file__, __name__, __package__ (but don't forget + __builtins__). + + That would be coupling our implementation to a specific `globals` + implementation. What if `globals` were to change? + """ + copied_globals = globals().copy() + copied_globals.update({ + '__doc__': 'Dynamically executed script', + '__file__': path, + '__name__': '__main__', + '__package__': None + }) + del copied_globals[create_initial_globals.__name__] + return copied_globals http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/aria/orchestrator/execution_plugin/exceptions.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/exceptions.py b/aria/orchestrator/execution_plugin/exceptions.py new file mode 100644 index 0000000..22fa9a9 --- /dev/null +++ b/aria/orchestrator/execution_plugin/exceptions.py @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class ProcessException(Exception): + + def __init__(self, stderr=None, stdout=None, command=None, exit_code=None): + super(ProcessException, self).__init__(stderr) + self.command = command + self.exit_code = exit_code + self.stdout = stdout + self.stderr = stderr + + +class TaskException(Exception): + pass + + +class ScriptException(Exception): + + def __init__(self, message=None, retry=None, retry_interval=None): + super(ScriptException, self).__init__(message) + self.retry = retry + self.retry_interval = retry_interval http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/aria/orchestrator/execution_plugin/local.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/local.py b/aria/orchestrator/execution_plugin/local.py new file mode 100644 index 0000000..bc2d661 --- /dev/null +++ b/aria/orchestrator/execution_plugin/local.py @@ -0,0 +1,125 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import subprocess +import threading +import StringIO + +from . import ctx_proxy +from . import exceptions +from . import common +from . import constants +from . import environment_globals +from . import python_script_scope + + +def run_script(ctx, script_path, process, **kwargs): + if not script_path: + ctx.task.abort('Missing script_path') + process = process or {} + script_path = common.download_script(ctx, script_path) + script_func = _get_run_script_func(script_path, process) + return script_func( + ctx=ctx, + script_path=script_path, + process=process, + operation_kwargs=kwargs) + + +def _get_run_script_func(script_path, process): + if _treat_script_as_python_script(script_path, process): + return _eval_script_func + else: + if _treat_script_as_powershell_script(script_path): + process.setdefault('command_prefix', constants.DEFAULT_POWERSHELL_EXECUTABLE) + return _execute_func + + +def _treat_script_as_python_script(script_path, process): + eval_python = process.get('eval_python') + script_extension = os.path.splitext(script_path)[1].lower() + return (eval_python is True or (script_extension == constants.PYTHON_SCRIPT_FILE_EXTENSION and + eval_python is not False)) + + +def _treat_script_as_powershell_script(script_path): + script_extension = os.path.splitext(script_path)[1].lower() + return script_extension == constants.POWERSHELL_SCRIPT_FILE_EXTENSION + + +def _eval_script_func(script_path, ctx, operation_kwargs, **_): + with python_script_scope(operation_ctx=ctx, operation_inputs=operation_kwargs): + execfile(script_path, environment_globals.create_initial_globals(script_path)) + + +def _execute_func(script_path, ctx, process, operation_kwargs): + os.chmod(script_path, 0755) + process = common.create_process_config( + script_path=script_path, + process=process, + operation_kwargs=operation_kwargs) + command = process['command'] + env = os.environ.copy() + env.update(process['env']) + ctx.logger.info('Executing: {0}'.format(command)) + common.patch_ctx(ctx) + with ctx_proxy.server.CtxProxy(ctx) as proxy: + env[ctx_proxy.client.CTX_SOCKET_URL] = proxy.socket_url + running_process = subprocess.Popen( + command, + shell=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + env=env, + cwd=process.get('cwd'), + bufsize=1, + close_fds=not common.is_windows()) + stdout_consumer = _OutputConsumer(running_process.stdout) + stderr_consumer = _OutputConsumer(running_process.stderr) + exit_code = running_process.wait() + stdout_consumer.join() + stderr_consumer.join() + ctx.logger.info('Execution done (exit_code={0}): {1}'.format(exit_code, command)) + + def error_check_func(): + if exit_code: + raise exceptions.ProcessException( + command=command, + exit_code=exit_code, + stdout=stdout_consumer.read_output(), + stderr=stderr_consumer.read_output()) + return common.check_error(ctx, error_check_func=error_check_func) + + +class _OutputConsumer(object): + + def __init__(self, out): + self._out = out + self._buffer = StringIO.StringIO() + self._consumer = threading.Thread(target=self._consume_output) + self._consumer.daemon = True + self._consumer.start() + + def _consume_output(self): + for line in iter(self._out.readline, b''): + self._buffer.write(line) + self._out.close() + + def read_output(self): + return self._buffer.getvalue() + + def join(self): + self._consumer.join() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/aria/orchestrator/execution_plugin/operations.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/operations.py b/aria/orchestrator/execution_plugin/operations.py new file mode 100644 index 0000000..5effa8a --- /dev/null +++ b/aria/orchestrator/execution_plugin/operations.py @@ -0,0 +1,63 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from aria.orchestrator import operation +from . import local as local_operations +from .ssh import operations as ssh_operations + + +@operation +def run_script_locally(ctx, + script_path, + process=None, + **kwargs): + return local_operations.run_script( + ctx=ctx, + script_path=script_path, + process=process, + **kwargs) + + +@operation +def run_script_with_ssh(ctx, + script_path, + fabric_env=None, + process=None, + use_sudo=False, + hide_output=None, + **kwargs): + return ssh_operations.run_script( + ctx=ctx, + script_path=script_path, + fabric_env=fabric_env, + process=process, + use_sudo=use_sudo, + hide_output=hide_output, + **kwargs) + + +@operation +def run_commands_with_ssh(ctx, + commands, + fabric_env=None, + use_sudo=False, + hide_output=None, + **_): + return ssh_operations.run_commands( + ctx=ctx, + commands=commands, + fabric_env=fabric_env, + use_sudo=use_sudo, + hide_output=hide_output) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/aria/orchestrator/execution_plugin/ssh/__init__.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/ssh/__init__.py b/aria/orchestrator/execution_plugin/ssh/__init__.py new file mode 100644 index 0000000..ae1e83e --- /dev/null +++ b/aria/orchestrator/execution_plugin/ssh/__init__.py @@ -0,0 +1,14 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/aria/orchestrator/execution_plugin/ssh/operations.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/ssh/operations.py b/aria/orchestrator/execution_plugin/ssh/operations.py new file mode 100644 index 0000000..7589d42 --- /dev/null +++ b/aria/orchestrator/execution_plugin/ssh/operations.py @@ -0,0 +1,192 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import random +import string +import tempfile +import StringIO + +import fabric.api +import fabric.context_managers +import fabric.contrib.files + +from .. import constants +from .. import exceptions +from .. import common +from .. import ctx_proxy +from . import tunnel + + +_PROXY_CLIENT_PATH = ctx_proxy.client.__file__ +if _PROXY_CLIENT_PATH.endswith('.pyc'): + _PROXY_CLIENT_PATH = _PROXY_CLIENT_PATH[:-1] + + +def run_commands(ctx, commands, fabric_env, use_sudo, hide_output, **_): + """Runs the provider 'commands' in sequence + + :param commands: a list of commands to run + :param fabric_env: fabric configuration + """ + with fabric.api.settings(_hide_output(ctx, groups=hide_output), + **_fabric_env(ctx, fabric_env, warn_only=True)): + for command in commands: + ctx.logger.info('Running command: {0}'.format(command)) + run = fabric.api.sudo if use_sudo else fabric.api.run + result = run(command) + if result.failed: + raise exceptions.ProcessException( + command=result.command, + exit_code=result.return_code, + stdout=result.stdout, + stderr=result.stderr) + + +def run_script(ctx, script_path, fabric_env, process, use_sudo, hide_output, **kwargs): + process = process or {} + paths = _Paths(base_dir=process.get('base_dir', constants.DEFAULT_BASE_DIR), + local_script_path=common.download_script(ctx, script_path)) + with fabric.api.settings(_hide_output(ctx, groups=hide_output), + **_fabric_env(ctx, fabric_env, warn_only=False)): + # the remote host must have the ctx before running any fabric scripts + if not fabric.contrib.files.exists(paths.remote_ctx_path): + # there may be race conditions with other operations that + # may be running in parallel, so we pass -p to make sure + # we get 0 exit code if the directory already exists + fabric.api.run('mkdir -p {0} && mkdir -p {1}'.format(paths.remote_scripts_dir, + paths.remote_work_dir)) + # this file has to be present before using ctx + fabric.api.put(_PROXY_CLIENT_PATH, paths.remote_ctx_path) + _patch_ctx(ctx) + process = common.create_process_config( + script_path=paths.remote_script_path, + process=process, + operation_kwargs=kwargs, + quote_json_env_vars=True) + fabric.api.put(paths.local_script_path, paths.remote_script_path) + with ctx_proxy.server.CtxProxy(ctx) as proxy: + local_port = proxy.port + with fabric.context_managers.cd(process.get('cwd', paths.remote_work_dir)): # pylint: disable=not-context-manager + with tunnel.remote(ctx, local_port=local_port) as remote_port: + local_socket_url = proxy.socket_url + remote_socket_url = local_socket_url.replace(str(local_port), str(remote_port)) + env_script = _write_environment_script_file( + process=process, + paths=paths, + local_socket_url=local_socket_url, + remote_socket_url=remote_socket_url) + fabric.api.put(env_script, paths.remote_env_script_path) + try: + command = 'source {0} && {1}'.format(paths.remote_env_script_path, + process['command']) + run = fabric.api.sudo if use_sudo else fabric.api.run + run(command) + except exceptions.TaskException: + return common.check_error(ctx, reraise=True) + return common.check_error(ctx) + + +def _patch_ctx(ctx): + common.patch_ctx(ctx) + original_download_resource = ctx.download_resource + original_download_resource_and_render = ctx.download_resource_and_render + + def _download_resource(func, destination, **kwargs): + handle, temp_local_path = tempfile.mkstemp() + os.close(handle) + try: + func(destination=temp_local_path, **kwargs) + return fabric.api.put(temp_local_path, destination) + finally: + os.remove(temp_local_path) + + def download_resource(destination, path=None): + _download_resource( + func=original_download_resource, + destination=destination, + path=path) + ctx.download_resource = download_resource + + def download_resource_and_render(destination, path=None, variables=None): + _download_resource( + func=original_download_resource_and_render, + destination=destination, + path=path, + variables=variables) + ctx.download_resource_and_render = download_resource_and_render + + +def _hide_output(ctx, groups): + """ Hides Fabric's output for every 'entity' in `groups` """ + groups = set(groups or []) + if not groups.issubset(constants.VALID_FABRIC_GROUPS): + ctx.task.abort('`hide_output` must be a subset of {0} (Provided: {1})' + .format(', '.join(constants.VALID_FABRIC_GROUPS), ', '.join(groups))) + return fabric.api.hide(*groups) + + +def _fabric_env(ctx, fabric_env, warn_only): + """Prepares fabric environment variables configuration""" + ctx.logger.debug('Preparing fabric environment...') + env = constants.FABRIC_ENV_DEFAULTS.copy() + env.update(fabric_env or {}) + env.setdefault('warn_only', warn_only) + if 'host_string' not in env: + env['host_string'] = ctx.task.runs_on.ip + # validations + if not env.get('host_string'): + ctx.task.abort('`host_string` not supplied and ip cannot be deduced automatically') + if not (env.get('password') or env.get('key_filename') or env.get('key')): + ctx.task.abort( + 'Access credentials not supplied ' + '(you must supply at least one of `key_filename`, `key` or `password`)') + if not env.get('user'): + ctx.task.abort('`user` not supplied') + ctx.logger.debug('Environment prepared successfully') + return env + + +def _write_environment_script_file(process, paths, local_socket_url, remote_socket_url): + env_script = StringIO.StringIO() + env = process['env'] + env['PATH'] = '{0}:$PATH'.format(paths.remote_ctx_dir) + env['PYTHONPATH'] = '{0}:$PYTHONPATH'.format(paths.remote_ctx_dir) + env_script.write('chmod +x {0}\n'.format(paths.remote_script_path)) + env_script.write('chmod +x {0}\n'.format(paths.remote_ctx_path)) + env.update({ + ctx_proxy.client.CTX_SOCKET_URL: remote_socket_url, + 'LOCAL_{0}'.format(ctx_proxy.client.CTX_SOCKET_URL): local_socket_url + }) + for key, value in env.iteritems(): + env_script.write('export {0}={1}\n'.format(key, value)) + return env_script + + +class _Paths(object): + + def __init__(self, base_dir, local_script_path): + self.local_script_path = local_script_path + self.remote_ctx_dir = base_dir + self.base_script_path = os.path.basename(self.local_script_path) + self.remote_ctx_path = '{0}/ctx'.format(self.remote_ctx_dir) + self.remote_scripts_dir = '{0}/scripts'.format(self.remote_ctx_dir) + self.remote_work_dir = '{0}/work'.format(self.remote_ctx_dir) + random_suffix = ''.join(random.choice(string.ascii_lowercase + string.digits) + for _ in range(8)) + remote_path_suffix = '{0}-{1}'.format(self.base_script_path, random_suffix) + self.remote_env_script_path = '{0}/env-{1}'.format(self.remote_scripts_dir, + remote_path_suffix) + self.remote_script_path = '{0}/{1}'.format(self.remote_scripts_dir, remote_path_suffix) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/aria/orchestrator/execution_plugin/ssh/tunnel.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/ssh/tunnel.py b/aria/orchestrator/execution_plugin/ssh/tunnel.py new file mode 100644 index 0000000..6fc8d54 --- /dev/null +++ b/aria/orchestrator/execution_plugin/ssh/tunnel.py @@ -0,0 +1,91 @@ +# This implementation was copied from the Fabric project directly: +# https://github.com/fabric/fabric/blob/master/fabric/context_managers.py#L486 +# The purpose was to remove the rtunnel creation printouts here: +# https://github.com/fabric/fabric/blob/master/fabric/context_managers.py#L547 + + +import contextlib +import select +import socket + +import fabric.api +import fabric.state +import fabric.thread_handling + + +@contextlib.contextmanager +def remote(ctx, local_port, remote_port=0, local_host='localhost', remote_bind_address='127.0.0.1'): + """Create a tunnel forwarding a locally-visible port to the remote target.""" + sockets = [] + channels = [] + thread_handlers = [] + + def accept(channel, *args, **kwargs): + # This seemingly innocent statement seems to be doing nothing + # but the truth is far from it! + # calling fileno() on a paramiko channel the first time, creates + # the required plumbing to make the channel valid for select. + # While this would generally happen implicitly inside the _forwarder + # function when select is called, it may already be too late and may + # cause the select loop to hang. + # Specifically, when new data arrives to the channel, a flag is set + # on an "event" object which is what makes the select call work. + # problem is this will only happen if the event object is not None + # and it will be not-None only after channel.fileno() has been called + # for the first time. If we wait until _forwarder calls select for the + # first time it may be after initial data has reached the channel. + # calling it explicitly here in the paramiko transport main event loop + # guarantees this will not happen. + channel.fileno() + + channels.append(channel) + sock = socket.socket() + sockets.append(sock) + + try: + sock.connect((local_host, local_port)) + except Exception as e: + try: + channel.close() + except Exception as ex2: + close_error = ' (While trying to close channel: {0})'.format(ex2) + else: + close_error = '' + ctx.task.abort('[{0}] rtunnel: cannot connect to {1}:{2} ({3}){4}' + .format(fabric.api.env.host_string, local_host, local_port, e, + close_error)) + + thread_handler = fabric.thread_handling.ThreadHandler('fwd', _forwarder, channel, sock) + thread_handlers.append(thread_handler) + + transport = fabric.state.connections[fabric.api.env.host_string].get_transport() + remote_port = transport.request_port_forward( + remote_bind_address, remote_port, handler=accept) + + try: + yield remote_port + finally: + for sock, chan, thread_handler in zip(sockets, channels, thread_handlers): + sock.close() + chan.close() + thread_handler.thread.join() + thread_handler.raise_if_needed() + transport.cancel_port_forward(remote_bind_address, remote_port) + + +def _forwarder(chan, sock): + # Bidirectionally forward data between a socket and a Paramiko channel. + while True: + read = select.select([sock, chan], [], [])[0] + if sock in read: + data = sock.recv(1024) + if len(data) == 0: + break + chan.send(data) + if chan in read: + data = chan.recv(1024) + if len(data) == 0: + break + sock.send(data) + chan.close() + sock.close() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/aria/orchestrator/workflows/api/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py index 70324a6..8d93837 100644 --- a/aria/orchestrator/workflows/api/task.py +++ b/aria/orchestrator/workflows/api/task.py @@ -68,7 +68,8 @@ class OperationTask(BaseTask): retry_interval=None, ignore_failure=None, inputs=None, - plugin=None): + plugin=None, + runs_on=None): """ Creates an operation task using the name, details, node instance and any additional kwargs. :param name: the operation of the name. @@ -89,6 +90,7 @@ class OperationTask(BaseTask): if retry_interval is None else retry_interval) self.ignore_failure = (self.workflow_context._task_ignore_failure if ignore_failure is None else ignore_failure) + self.runs_on = runs_on @classmethod def node_instance(cls, instance, name, inputs=None, *args, **kwargs): @@ -104,6 +106,7 @@ class OperationTask(BaseTask): operation_details=instance.node.operations[name], inputs=inputs, plugins=instance.node.plugins or [], + runs_on=model.Task.RUNS_ON_NODE, *args, **kwargs) @@ -126,18 +129,22 @@ class OperationTask(BaseTask): operation_details = getattr(instance.relationship, operation_end)[name] if operation_end == cls.SOURCE_OPERATION: plugins = instance.relationship.source_node.plugins + runs_on = model.Task.RUNS_ON_SOURCE else: plugins = instance.relationship.target_node.plugins + runs_on = model.Task.RUNS_ON_TARGET return cls._instance(instance=instance, name=name, operation_details=operation_details, inputs=inputs, plugins=plugins or [], + runs_on=runs_on, *args, **kwargs) @classmethod - def _instance(cls, instance, name, operation_details, inputs, plugins, *args, **kwargs): + def _instance(cls, instance, name, operation_details, inputs, plugins, runs_on, *args, + **kwargs): operation_mapping = operation_details.get('operation') operation_inputs = operation_details.get('inputs', {}) operation_inputs.update(inputs or {}) @@ -151,6 +158,7 @@ class OperationTask(BaseTask): operation_mapping=operation_mapping, inputs=operation_inputs, plugin=plugin, + runs_on=runs_on, *args, **kwargs) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py index 47269a3..fd83614 100644 --- a/aria/orchestrator/workflows/core/engine.py +++ b/aria/orchestrator/workflows/core/engine.py @@ -52,7 +52,6 @@ class Engine(logger.LoggerMixin): """ try: events.start_workflow_signal.send(self._workflow_context) - cancel = False while True: cancel = self._is_cancel() if cancel: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/aria/orchestrator/workflows/core/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py index 67be2ea..1deb66a 100644 --- a/aria/orchestrator/workflows/core/task.py +++ b/aria/orchestrator/workflows/core/task.py @@ -135,7 +135,8 @@ class OperationTask(BaseTask): retry_interval=api_task.retry_interval, ignore_failure=api_task.ignore_failure, plugin=plugins[0] if plugins else None, - execution=self._workflow_context.execution + execution=self._workflow_context.execution, + runs_on=api_task.runs_on ) self._workflow_context.model.task.put(operation_task) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index 51596c3..2cc9178 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -40,6 +40,7 @@ import Queue import jsonpickle from aria.utils import imports +from aria.utils import exceptions from aria.orchestrator.workflows.executor import base from aria.orchestrator.context import serialization from aria.storage import instrumentation @@ -255,7 +256,7 @@ class _Messenger(object): data = jsonpickle.dumps({ 'type': type, 'task_id': self.task_id, - 'exception': exception, + 'exception': exceptions.wrap_if_needed(exception), 'tracked_changes': tracked_changes }) sock.send(struct.pack(_INT_FMT, len(data))) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/aria/storage/base_model.py ---------------------------------------------------------------------- diff --git a/aria/storage/base_model.py b/aria/storage/base_model.py index a2bbb60..29973a2 100644 --- a/aria/storage/base_model.py +++ b/aria/storage/base_model.py @@ -695,6 +695,11 @@ class TaskBase(ModelMixin): WAIT_STATES = [PENDING, RETRYING] END_STATES = [SUCCESS, FAILED] + RUNS_ON_SOURCE = 'source' + RUNS_ON_TARGET = 'target' + RUNS_ON_NODE = 'node' + RUNS_ON = (RUNS_ON_NODE, RUNS_ON_SOURCE, RUNS_ON_TARGET) + @orm.validates('max_attempts') def validate_max_attempts(self, _, value): # pylint: disable=no-self-use """Validates that max attempts is either -1 or a positive number""" @@ -718,6 +723,7 @@ class TaskBase(ModelMixin): # Operation specific fields operation_mapping = Column(String) inputs = Column(Dict) + _runs_on = Column(Enum(*RUNS_ON, name='runs_on'), name='runs_on') @property def actor(self): @@ -727,13 +733,23 @@ class TaskBase(ModelMixin): """ return self.node_instance or self.relationship_instance + @property + def runs_on(self): + if self._runs_on == self.RUNS_ON_NODE: + return self.node_instance + elif self._runs_on == self.RUNS_ON_SOURCE: + return self.relationship_instance.source_node_instance # pylint: disable=no-member + elif self._runs_on == self.RUNS_ON_TARGET: + return self.relationship_instance.target_node_instance # pylint: disable=no-member + return None + @classmethod - def as_node_instance(cls, instance, **kwargs): - return cls(node_instance=instance, **kwargs) + def as_node_instance(cls, instance, runs_on, **kwargs): + return cls(node_instance=instance, _runs_on=runs_on, **kwargs) @classmethod - def as_relationship_instance(cls, instance, **kwargs): - return cls(relationship_instance=instance, **kwargs) + def as_relationship_instance(cls, instance, runs_on, **kwargs): + return cls(relationship_instance=instance, _runs_on=runs_on, **kwargs) @staticmethod def abort(message=None): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/aria/storage/filesystem_rapi.py ---------------------------------------------------------------------- diff --git a/aria/storage/filesystem_rapi.py b/aria/storage/filesystem_rapi.py index c6b3a81..eb30e0b 100644 --- a/aria/storage/filesystem_rapi.py +++ b/aria/storage/filesystem_rapi.py @@ -132,7 +132,7 @@ class FileSystemResourceAPI(api.ResourceAPI): if os.path.isfile(resource): shutil.copy2(resource, destination) else: - dir_util.copy_tree(resource, destination) # pylint: disable=no-member + dir_util.copy_tree(resource, destination) # pylint: disable=no-member def upload(self, entry_id, source, path=None, **_): """ http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/aria/utils/exceptions.py ---------------------------------------------------------------------- diff --git a/aria/utils/exceptions.py b/aria/utils/exceptions.py index 0370bb3..a19eb78 100644 --- a/aria/utils/exceptions.py +++ b/aria/utils/exceptions.py @@ -16,6 +16,8 @@ import sys import linecache +import jsonpickle + from clint.textui import indent from .console import (puts, Colored) @@ -39,6 +41,7 @@ def print_exception(e, full=True, cause=False, traceback=None): traceback = e.cause_traceback if hasattr(e, 'cause_traceback') else None print_exception(e.cause, full=full, cause=True, traceback=traceback) + def print_traceback(traceback=None): """ Prints the traceback with nice colors and such. @@ -62,3 +65,19 @@ def print_traceback(traceback=None): with indent(2): puts(Colored.black(line.strip())) traceback = traceback.tb_next + + +class _WrappedException(Exception): + + def __init__(self, exception_type, exception_str): + super(_WrappedException, self).__init__(exception_type, exception_str) + self.exception_type = exception_type + self.exception_str = exception_str + + +def wrap_if_needed(exception): + try: + jsonpickle.loads(jsonpickle.dumps(exception)) + return exception + except BaseException: + return _WrappedException(type(exception).__name__, str(exception)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/requirements.txt ---------------------------------------------------------------------- diff --git a/requirements.txt b/requirements.txt index 0005a5e..055bafb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -26,3 +26,6 @@ CacheControl[filecache]==0.11.6 clint==0.5.1 SQLAlchemy==1.1.4 wagon==0.5.0 +bottle==0.12.11 +six==1.10.0 +Fabric==1.13.1 http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/setup.py ---------------------------------------------------------------------- diff --git a/setup.py b/setup.py index 112f13e..5a3d016 100644 --- a/setup.py +++ b/setup.py @@ -44,6 +44,11 @@ except IOError: install_requires = [] +console_scripts = ['aria = aria.cli.cli:main'] +if os.environ.get('INSTALL_CTX'): + console_scripts.append('ctx = aria.orchestrator.execution_plugin.ctx_proxy.client:main') + + setup( name=_PACKAGE_NAME, version=version, @@ -77,8 +82,6 @@ setup( zip_safe=False, install_requires=install_requires, entry_points={ - 'console_scripts': [ - 'aria = aria.cli.cli:main' - ] + 'console_scripts': console_scripts } ) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/tests/orchestrator/execution_plugin/__init__.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/__init__.py b/tests/orchestrator/execution_plugin/__init__.py new file mode 100644 index 0000000..ae1e83e --- /dev/null +++ b/tests/orchestrator/execution_plugin/__init__.py @@ -0,0 +1,14 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/3e0a578d/tests/orchestrator/execution_plugin/test_common.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_common.py b/tests/orchestrator/execution_plugin/test_common.py new file mode 100644 index 0000000..4e2f714 --- /dev/null +++ b/tests/orchestrator/execution_plugin/test_common.py @@ -0,0 +1,191 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from collections import namedtuple + +import requests +import pytest + +from aria.storage import model +from aria.orchestrator import exceptions +from aria.orchestrator.execution_plugin import common + + +class TestDownloadScript(object): + + @pytest.fixture(autouse=True) + def patch_requests(self, mocker): + def _mock_requests_get(url): + response = namedtuple('Response', 'text status_code') + return response(url, self.status_code) + self.status_code = 200 + mocker.patch.object(requests, 'get', _mock_requests_get) + + def _test_url(self, url): + class Ctx(object): + task = model.Task + + script_path = url + result = common.download_script(Ctx, script_path) + with open(result) as f: + assert script_path == f.read() + assert result.endswith('-some_script.py') + + def test_http_url(self): + self._test_url('http://localhost/some_script.py') + + def test_https_url(self): + self._test_url('https://localhost/some_script.py') + + def test_url_status_code_404(self): + self.status_code = 404 + with pytest.raises(exceptions.TaskAbortException) as exc_ctx: + self.test_http_url() + exception = exc_ctx.value + assert 'status code: 404' in str(exception) + + def test_blueprint_resource(self): + test_script_path = 'my_script.py' + + class Ctx(object): + @staticmethod + def download_resource(destination, path): + assert path == test_script_path + return destination + result = common.download_script(Ctx, test_script_path) + assert result.endswith(test_script_path) + + +class TestCreateProcessConfig(object): + + def test_plain_command(self): + script_path = 'path' + process = common.create_process_config( + script_path=script_path, + process={}, + operation_kwargs={}) + assert process['command'] == script_path + + def test_command_with_args(self): + script_path = 'path' + process = {'args': [1, 2, 3]} + process = common.create_process_config( + script_path=script_path, + process=process, + operation_kwargs={}) + assert process['command'] == '{0} 1 2 3'.format(script_path) + + def test_command_prefix(self): + script_path = 'path' + command_prefix = 'prefix' + process = {'command_prefix': command_prefix} + process = common.create_process_config( + script_path=script_path, + process=process, + operation_kwargs={}) + assert process['command'] == '{0} {1}'.format(command_prefix, script_path) + + def test_command_with_args_and_prefix(self): + script_path = 'path' + command_prefix = 'prefix' + process = {'command_prefix': command_prefix, + 'args': [1, 2, 3]} + process = common.create_process_config( + script_path=script_path, + process=process, + operation_kwargs={}) + assert process['command'] == '{0} {1} 1 2 3'.format(command_prefix, script_path) + + def test_ctx_is_removed(self): + process = common.create_process_config( + script_path='', + process={}, + operation_kwargs={'ctx': 1}) + assert 'ctx' not in process['env'] + + def test_env_passed_explicitly(self): + env = {'one': '1', 'two': '2'} + process = common.create_process_config( + script_path='', + process={'env': env}, + operation_kwargs={}) + assert process['env'] == env + + def test_env_populated_from_operation_kwargs(self): + operation_kwargs = {'one': '1', 'two': '2'} + process = common.create_process_config( + script_path='', + process={}, + operation_kwargs=operation_kwargs) + assert process['env'] == operation_kwargs + + def test_env_merged_from_operation_kwargs_and_process(self): + operation_kwargs = {'one': '1', 'two': '2'} + env = {'three': '3', 'four': '4'} + process = common.create_process_config( + script_path='', + process={'env': env}, + operation_kwargs=operation_kwargs) + assert process['env'] == dict(operation_kwargs.items() + env.items()) + + def test_process_env_gets_precedence_over_operation_kwargs(self): + operation_kwargs = {'one': 'from_kwargs'} + env = {'one': 'from_env_process'} + process = common.create_process_config( + script_path='', + process={'env': env}, + operation_kwargs=operation_kwargs) + assert process['env'] == env + + def test_json_env_vars(self): + operation_kwargs = {'a_dict': {'key': 'value'}, + 'a_list': ['a', 'b', 'c'], + 'a_tuple': (4, 5, 6), + 'a_bool': True} + process = common.create_process_config( + script_path='', + process={}, + operation_kwargs=operation_kwargs) + assert process['env'] == {'a_dict': '{"key": "value"}', + 'a_list': '["a", "b", "c"]', + 'a_tuple': '[4, 5, 6]', + 'a_bool': 'true'} + + def test_quote_json_env_vars(self): + operation_kwargs = {'one': []} + process = common.create_process_config( + script_path='', + process={}, + operation_kwargs=operation_kwargs, + quote_json_env_vars=True) + assert process['env']['one'] == "'[]'" + + @pytest.mark.skipif(not common.is_windows(), reason='windows') + def test_env_keys_converted_to_string_on_windows(self): + env = {u'one': '1'} + process = common.create_process_config( + script_path='', + process={'env': env}, + operation_kwargs={}) + assert isinstance(process['env'].keys()[0], str) + + @pytest.mark.skipif(not common.is_windows(), reason='windows') + def test_env_values_quotes_are_escaped_on_windows(self): + env = {'one': '"hello"'} + process = common.create_process_config( + script_path='', + process={'env': env}, + operation_kwargs={}) + assert process['env']['one'] == '\\"hello\\"'