Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-185-NullPool-logging-messages-appear-during-execution c1868c2b4 -> 0f2df7d1b
removed endpoint for session terminaton. clearing a session is done inhouse Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/0f2df7d1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/0f2df7d1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/0f2df7d1 Branch: refs/heads/ARIA-185-NullPool-logging-messages-appear-during-execution Commit: 0f2df7d1b01fc515f578cc02ad9594330a5bbccf Parents: c1868c2 Author: max-orlov <[email protected]> Authored: Wed May 24 01:13:04 2017 +0300 Committer: max-orlov <[email protected]> Committed: Wed May 24 01:13:04 2017 +0300 ---------------------------------------------------------------------- aria/orchestrator/context/operation.py | 5 +++ .../execution_plugin/ctx_proxy/client.py | 34 +++++---------- .../execution_plugin/ctx_proxy/server.py | 46 +++++++++++--------- aria/orchestrator/workflows/executor/process.py | 3 ++ 4 files changed, 46 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0f2df7d1/aria/orchestrator/context/operation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py index 52d3ab6..0ce790f 100644 --- a/aria/orchestrator/context/operation.py +++ b/aria/orchestrator/context/operation.py @@ -102,6 +102,11 @@ class BaseOperationContext(BaseContext): destroy_session=True, **kwargs) + def close(self): + if self._destroy_session: + self.model.log._session.remove() + self.model.log._engine.dispose() + class NodeOperationContext(BaseOperationContext): """ http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0f2df7d1/aria/orchestrator/execution_plugin/ctx_proxy/client.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/ctx_proxy/client.py b/aria/orchestrator/execution_plugin/ctx_proxy/client.py index fbfff57..f7f56aa 100644 --- a/aria/orchestrator/execution_plugin/ctx_proxy/client.py +++ b/aria/orchestrator/execution_plugin/ctx_proxy/client.py @@ -89,29 +89,19 @@ def _process_args(json_prefix, args): return processed_args -def _close_session(socket_url, timeout): - return _client_request(socket_url, [], timeout, 'DELETE') - - def main(args=None): - try: - args = _parse_args(args) - response = _client_request( - args.socket_url, - args=_process_args(args.json_arg_prefix, args.args), - timeout=args.timeout) - if args.json_output: - response = json.dumps(response) - else: - if not response: - response = '' - response = str(response) - sys.stdout.write(response) - except BaseException as origial_e: - try: - _close_session(args.socket_url, args.timeout) - except BaseException: - raise origial_e + args = _parse_args(args) + response = _client_request( + args.socket_url, + args=_process_args(args.json_arg_prefix, args.args), + timeout=args.timeout) + if args.json_output: + response = json.dumps(response) + else: + if not response: + response = '' + response = str(response) + sys.stdout.write(response) if __name__ == '__main__': main() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0f2df7d1/aria/orchestrator/execution_plugin/ctx_proxy/server.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/ctx_proxy/server.py b/aria/orchestrator/execution_plugin/ctx_proxy/server.py index 6e5b0e3..d9195ff 100644 --- a/aria/orchestrator/execution_plugin/ctx_proxy/server.py +++ b/aria/orchestrator/execution_plugin/ctx_proxy/server.py @@ -27,7 +27,6 @@ import bottle from aria import modeling from .. import exceptions -from . import client class CtxProxy(object): @@ -38,21 +37,39 @@ class CtxProxy(object): self.port = _get_unused_port() self.socket_url = 'http://localhost:{0}'.format(self.port) self.server = None + self.bottle_server = None self._started = Queue.Queue(1) + self._terminated = Queue.Queue(1) self.thread = self._start_server() - self._started.get(timeout=5) + self._started.get(timeout=50) def _start_server(self): proxy = self class BottleServerAdapter(bottle.ServerAdapter): + def __init__(self, _session, _terminated, *args, **kwargs): + super(BottleServerAdapter, self).__init__(*args, **kwargs) + self._session = _session + self._terminated = _terminated + def run(self, app): + class Server(wsgiref.simple_server.WSGIServer): allow_reuse_address = True + bottle_server = self def handle_error(self, request, client_address): pass + def serve_forever(self, poll_interval=0.5): + try: + wsgiref.simple_server.WSGIServer.serve_forever(self, poll_interval) + # Once shutdown is called, we need to close the session + self.bottle_server._session.remove() + finally: + # only after we tried to close the session, we can proceed. + self.bottle_server._terminated.put(True) + class Handler(wsgiref.simple_server.WSGIRequestHandler): def address_string(self): return self.client_address[0] @@ -77,46 +94,36 @@ class CtxProxy(object): bottle_app = bottle.Bottle() bottle_app.post('/', callback=self._request_handler) - bottle_app.delete('/', callback=self._teardown_handler) bottle.run( app=bottle_app, host='localhost', port=self.port, quiet=True, - server=BottleServerAdapter) + server=BottleServerAdapter, + _session=proxy.ctx.model.log._session, + _terminated=self._terminated) thread = threading.Thread(target=serve) thread.daemon = True thread.start() return thread def close(self): - client._close_session(self.socket_url, 10) if self.server: self.server.shutdown() + while self._started.not_empty and self._terminated.empty(): + # wait for the process of shutdown to complete (i.e. the session is terminated) + pass self.server.server_close() def _request_handler(self): - return self._handle(self._process) - - def _teardown_handler(self): - return self._handle(self._close_session) - - def _handle(self, handler): request = bottle.request.body.read() # pylint: disable=no-member - response = handler(request) + response = self._process(request) return bottle.LocalResponse( body=json.dumps(response, cls=modeling.utils.ModelJSONEncoder), status=200, headers={'content-type': 'application/json'} ) - def _close_session(self, *args, **kwargs): - # Since this runs in a daemon thread, we need to close the session each time we process - # a request (a new session would be supplied by SQLAlchemy scoped_session). - # log mapi is chosen arbitrarily. - self.ctx.model.log._session.remove() - return {} - def _process(self, request): try: typed_request = json.loads(request) @@ -128,7 +135,6 @@ class CtxProxy(object): result_type = 'stop_operation' result = {'type': result_type, 'payload': payload} except Exception as e: - self._close_session() traceback_out = StringIO.StringIO() traceback.print_exc(file=traceback_out) payload = { http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0f2df7d1/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index da6bbb2..23cf9ff 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -387,13 +387,16 @@ def _main(): for decorate in process_executor.decorate(): task_func = decorate(task_func) task_func(ctx=ctx, **operation_inputs) + ctx.close() messenger.succeeded(tracked_changes=instrument.tracked_changes, new_instances=instrument.new_instances) except BaseException as e: + ctx.close() messenger.failed(exception=e, tracked_changes=instrument.tracked_changes, new_instances=instrument.new_instances) finally: + ctx.close() instrument.expunge_session() if __name__ == '__main__':
