Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-185-NullPool-logging-messages-appear-during-execution aea7cebc8 -> 51816fdb6
added between thread messaging Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/51816fdb Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/51816fdb Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/51816fdb Branch: refs/heads/ARIA-185-NullPool-logging-messages-appear-during-execution Commit: 51816fdb6184cb9110c03ca8a133522717e46c8a Parents: aea7ceb Author: max-orlov <[email protected]> Authored: Tue May 23 18:17:08 2017 +0300 Committer: max-orlov <[email protected]> Committed: Tue May 23 18:45:13 2017 +0300 ---------------------------------------------------------------------- aria/orchestrator/context/operation.py | 13 ++--- .../execution_plugin/ctx_proxy/client.py | 54 ++++++++++++-------- .../execution_plugin/ctx_proxy/server.py | 38 ++++++++------ 3 files changed, 58 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/51816fdb/aria/orchestrator/context/operation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py index e0eb927..52d3ab6 100644 --- a/aria/orchestrator/context/operation.py +++ b/aria/orchestrator/context/operation.py @@ -29,11 +29,11 @@ class BaseOperationContext(BaseContext): Context object used during operation creation and execution """ - def __init__(self, task_id, actor_id, instantiated_from_dict=False, **kwargs): + def __init__(self, task_id, actor_id, **kwargs): self._task_id = task_id self._actor_id = actor_id self._thread_local = threading.local() - self._instantiated_from_dict = instantiated_from_dict + self._destroy_session = kwargs.pop('destroy_session', False) logger_level = kwargs.pop('logger_level', None) super(BaseOperationContext, self).__init__(**kwargs) self._register_logger(task_id=self.task.id, level=logger_level) @@ -99,16 +99,9 @@ class BaseOperationContext(BaseContext): return cls(model_storage=model_storage, resource_storage=resource_storage, - instantiated_from_dict=True, + destroy_session=True, **kwargs) - def __del__(self): - self.close() - - def close(self): - if self._instantiated_from_dict: - self.model.log._session.remove() - self.model.log._engine.dispose() class NodeOperationContext(BaseOperationContext): """ http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/51816fdb/aria/orchestrator/execution_plugin/ctx_proxy/client.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/ctx_proxy/client.py b/aria/orchestrator/execution_plugin/ctx_proxy/client.py index d965a5e..f58c727 100644 --- a/aria/orchestrator/execution_plugin/ctx_proxy/client.py +++ b/aria/orchestrator/execution_plugin/ctx_proxy/client.py @@ -34,22 +34,25 @@ class _RequestError(RuntimeError): self.ex_traceback = ex_traceback -def _http_request(socket_url, request, timeout): - response = urllib2.urlopen( - url=socket_url, - data=json.dumps(request), - timeout=timeout) +def _http_request(socket_url, request, method, timeout): + opener = urllib2.build_opener(urllib2.HTTPHandler) + request = urllib2.Request(socket_url, data=json.dumps(request)) + request.get_method = lambda: method + response = opener.open(request, timeout=timeout) + if response.code != 200: raise RuntimeError('Request failed: {0}'.format(response)) return json.loads(response.read()) -def _client_request(socket_url, args, timeout): +def _client_request(socket_url, args, method, timeout): response = _http_request( socket_url=socket_url, request={'args': args}, - timeout=timeout) - payload = response['payload'] + method=method, + timeout=timeout + ) + payload = response.get('payload') response_type = response.get('type') if response_type == 'error': ex_type = payload['type'] @@ -86,20 +89,29 @@ def _process_args(json_prefix, args): return processed_args -def main(args=None): - args = _parse_args(args) - response = _client_request( - socket_url=args.socket_url, - args=_process_args(args.json_arg_prefix, args.args), - timeout=args.timeout) - if args.json_output: - response = json.dumps(response) - else: - if not response: - response = '' - response = str(response) - sys.stdout.write(response) +def _close_session(socket_url, timeout): + return _client_request(socket_url, [], 'DELETE', timeout) +def main(args=None): + try: + args = _parse_args(args) + response = _client_request(args.socket_url, + args=_process_args(args.json_arg_prefix, args.args), + method='POST', + timeout=args.timeout) + if args.json_output: + response = json.dumps(response) + else: + if not response: + response = '' + response = str(response) + sys.stdout.write(response) + except BaseException: + try: + _close_session(args.socket_url, args.timeout) + except BaseException: + pass + if __name__ == '__main__': main() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/51816fdb/aria/orchestrator/execution_plugin/ctx_proxy/server.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/ctx_proxy/server.py b/aria/orchestrator/execution_plugin/ctx_proxy/server.py index e3496b9..7296065 100644 --- a/aria/orchestrator/execution_plugin/ctx_proxy/server.py +++ b/aria/orchestrator/execution_plugin/ctx_proxy/server.py @@ -27,6 +27,7 @@ import bottle from aria import modeling from .. import exceptions +from . import client class CtxProxy(object): @@ -76,6 +77,7 @@ class CtxProxy(object): bottle_app = bottle.Bottle() bottle_app.post('/', callback=self._request_handler) + bottle_app.delete('/', callback=self._teardown_handler) bottle.run( app=bottle_app, host='localhost', @@ -88,17 +90,31 @@ class CtxProxy(object): return thread def close(self): + client._close_session(self.socket_url, 10) if self.server: self.server.shutdown() self.server.server_close() def _request_handler(self): + return self._handle(self._process) + + def _teardown_handler(self): + return self._handle(self._close_session) + + def _handle(self, handler): request = bottle.request.body.read() # pylint: disable=no-member - response = self._process(request) + response = handler(request) return bottle.LocalResponse( - body=response, + body=json.dumps(response or {}, cls=modeling.utils.ModelJSONEncoder), status=200, - headers={'content-type': 'application/json'}) + headers={'content-type': 'application/json'} + ) + + def _close_session(self, *args, **kwargs): + # Since this runs in a daemon thread, we need to close the session each time we process + # a request (a new session would be supplied by SQLAlchemy scoped_session). + # log mapi is chosen arbitrarily. + return self.ctx.model.log._session.remove() def _process(self, request): try: @@ -109,11 +125,9 @@ class CtxProxy(object): if isinstance(payload, exceptions.ScriptException): payload = dict(message=str(payload)) result_type = 'stop_operation' - result = json.dumps({ - 'type': result_type, - 'payload': payload - }, cls=modeling.utils.ModelJSONEncoder) + result = {'type': result_type, 'payload': payload} except Exception as e: + self._close_session() traceback_out = StringIO.StringIO() traceback.print_exc(file=traceback_out) payload = { @@ -121,15 +135,7 @@ class CtxProxy(object): 'message': str(e), 'traceback': traceback_out.getvalue() } - result = json.dumps({ - 'type': 'error', - 'payload': payload - }) - finally: - # Since this runs in a daemon thread, we need to close the session each time we process - # a request (a new session would be supplied by SQLAlchemy scoped_session). - # log mapi is chosen arbitrarily. - self.ctx.model.log._session.remove() + result = {'type': 'error', 'payload': payload} return result
