Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-149-functions-in-operation-configuration c063b4097 -> 888c5cd6f (forced update)
NullPool logging messages appear during execution Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/0c986842 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/0c986842 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/0c986842 Branch: refs/heads/ARIA-149-functions-in-operation-configuration Commit: 0c986842d52eca823ab92442dd9d77267e369ae8 Parents: 3d22d36 Author: max-orlov <[email protected]> Authored: Mon May 22 18:28:12 2017 +0300 Committer: max-orlov <[email protected]> Committed: Wed May 24 12:15:13 2017 +0300 ---------------------------------------------------------------------- aria/orchestrator/context/common.py | 1 - aria/orchestrator/context/operation.py | 13 +++++-- .../execution_plugin/ctx_proxy/client.py | 22 ++++++------ .../execution_plugin/ctx_proxy/server.py | 37 +++++++++++++------- aria/orchestrator/workflows/executor/process.py | 2 +- .../execution_plugin/test_ctx_proxy_server.py | 4 ++- .../orchestrator/workflows/executor/__init__.py | 2 +- 7 files changed, 52 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0c986842/aria/orchestrator/context/common.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py index 0854a27..c98e026 100644 --- a/aria/orchestrator/context/common.py +++ b/aria/orchestrator/context/common.py @@ -16,7 +16,6 @@ """ A common context for both workflow and operation """ - import logging from contextlib import contextmanager from functools import partial http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0c986842/aria/orchestrator/context/operation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py index 68a02aa..0ce790f 100644 --- a/aria/orchestrator/context/operation.py +++ b/aria/orchestrator/context/operation.py @@ -33,6 +33,7 @@ class BaseOperationContext(BaseContext): self._task_id = task_id self._actor_id = actor_id self._thread_local = threading.local() + self._destroy_session = kwargs.pop('destroy_session', False) logger_level = kwargs.pop('logger_level', None) super(BaseOperationContext, self).__init__(**kwargs) self._register_logger(task_id=self.task.id, level=logger_level) @@ -90,13 +91,21 @@ class BaseOperationContext(BaseContext): } @classmethod - def deserialize_from_dict(cls, model_storage=None, resource_storage=None, **kwargs): + def instantiate_from_dict(cls, model_storage=None, resource_storage=None, **kwargs): if model_storage: model_storage = aria.application_model_storage(**model_storage) if resource_storage: resource_storage = aria.application_resource_storage(**resource_storage) - return cls(model_storage=model_storage, resource_storage=resource_storage, **kwargs) + return cls(model_storage=model_storage, + resource_storage=resource_storage, + destroy_session=True, + **kwargs) + + def close(self): + if self._destroy_session: + self.model.log._session.remove() + self.model.log._engine.dispose() class NodeOperationContext(BaseOperationContext): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0c986842/aria/orchestrator/execution_plugin/ctx_proxy/client.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/ctx_proxy/client.py b/aria/orchestrator/execution_plugin/ctx_proxy/client.py index d965a5e..f7f56aa 100644 --- a/aria/orchestrator/execution_plugin/ctx_proxy/client.py +++ b/aria/orchestrator/execution_plugin/ctx_proxy/client.py @@ -34,22 +34,25 @@ class _RequestError(RuntimeError): self.ex_traceback = ex_traceback -def _http_request(socket_url, request, timeout): - response = urllib2.urlopen( - url=socket_url, - data=json.dumps(request), - timeout=timeout) +def _http_request(socket_url, request, method, timeout): + opener = urllib2.build_opener(urllib2.HTTPHandler) + request = urllib2.Request(socket_url, data=json.dumps(request)) + request.get_method = lambda: method + response = opener.open(request, timeout=timeout) + if response.code != 200: raise RuntimeError('Request failed: {0}'.format(response)) return json.loads(response.read()) -def _client_request(socket_url, args, timeout): +def _client_request(socket_url, args, timeout, method='POST'): response = _http_request( socket_url=socket_url, request={'args': args}, - timeout=timeout) - payload = response['payload'] + method=method, + timeout=timeout + ) + payload = response.get('payload') response_type = response.get('type') if response_type == 'error': ex_type = payload['type'] @@ -89,7 +92,7 @@ def _process_args(json_prefix, args): def main(args=None): args = _parse_args(args) response = _client_request( - socket_url=args.socket_url, + args.socket_url, args=_process_args(args.json_arg_prefix, args.args), timeout=args.timeout) if args.json_output: @@ -100,6 +103,5 @@ def main(args=None): response = str(response) sys.stdout.write(response) - if __name__ == '__main__': main() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0c986842/aria/orchestrator/execution_plugin/ctx_proxy/server.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/ctx_proxy/server.py b/aria/orchestrator/execution_plugin/ctx_proxy/server.py index 52a5312..1ce0e08 100644 --- a/aria/orchestrator/execution_plugin/ctx_proxy/server.py +++ b/aria/orchestrator/execution_plugin/ctx_proxy/server.py @@ -42,16 +42,31 @@ class CtxProxy(object): self._started.get(timeout=5) def _start_server(self): - proxy = self class BottleServerAdapter(bottle.ServerAdapter): + proxy = self + + def close_session(self): + self.proxy.ctx.model.log._session.remove() + def run(self, app): + class Server(wsgiref.simple_server.WSGIServer): allow_reuse_address = True + bottle_server = self def handle_error(self, request, client_address): pass + def serve_forever(self, poll_interval=0.5): + try: + wsgiref.simple_server.WSGIServer.serve_forever(self, poll_interval) + finally: + # Once shutdown is called, we need to close the session. + # If the session is not closed properly, it might raise warnings, + # or even lock the database. + self.bottle_server.close_session() + class Handler(wsgiref.simple_server.WSGIRequestHandler): def address_string(self): return self.client_address[0] @@ -66,8 +81,8 @@ class CtxProxy(object): app=app, server_class=Server, handler_class=Handler) - proxy.server = server - proxy._started.put(True) + self.proxy.server = server + self.proxy._started.put(True) server.serve_forever(poll_interval=0.1) def serve(): @@ -96,9 +111,10 @@ class CtxProxy(object): request = bottle.request.body.read() # pylint: disable=no-member response = self._process(request) return bottle.LocalResponse( - body=response, + body=json.dumps(response, cls=modeling.utils.ModelJSONEncoder), status=200, - headers={'content-type': 'application/json'}) + headers={'content-type': 'application/json'} + ) def _process(self, request): try: @@ -109,10 +125,7 @@ class CtxProxy(object): if isinstance(payload, exceptions.ScriptException): payload = dict(message=str(payload)) result_type = 'stop_operation' - result = json.dumps({ - 'type': result_type, - 'payload': payload - }, cls=modeling.utils.ModelJSONEncoder) + result = {'type': result_type, 'payload': payload} except Exception as e: traceback_out = StringIO.StringIO() traceback.print_exc(file=traceback_out) @@ -121,10 +134,8 @@ class CtxProxy(object): 'message': str(e), 'traceback': traceback_out.getvalue() } - result = json.dumps({ - 'type': 'error', - 'payload': payload - }) + result = {'type': 'error', 'payload': payload} + return result def __enter__(self): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0c986842/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index 824c4e1..da6bbb2 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -373,7 +373,7 @@ def _main(): # See docstring of `remove_mutable_association_listener` for further details modeling_types.remove_mutable_association_listener() try: - ctx = context_dict['context_cls'].deserialize_from_dict(**context_dict['context']) + ctx = context_dict['context_cls'].instantiate_from_dict(**context_dict['context']) except BaseException as e: messenger.failed(exception=e, tracked_changes=None, new_instances=None) return http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0c986842/tests/orchestrator/execution_plugin/test_ctx_proxy_server.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_ctx_proxy_server.py b/tests/orchestrator/execution_plugin/test_ctx_proxy_server.py index 98ceff9..1b19fd9 100644 --- a/tests/orchestrator/execution_plugin/test_ctx_proxy_server.py +++ b/tests/orchestrator/execution_plugin/test_ctx_proxy_server.py @@ -136,7 +136,7 @@ class TestCtxProxy(object): kwargs=kwargs) @pytest.fixture - def ctx(self): + def ctx(self, mocker): class MockCtx(object): pass ctx = MockCtx() @@ -160,11 +160,13 @@ class TestCtxProxy(object): ctx.stub_args = self.stub_args ctx.stub_attr = self.StubAttribute() ctx.node = self.NodeAttribute(properties) + ctx.model = mocker.MagicMock() return ctx @pytest.fixture def server(self, ctx): result = ctx_proxy.server.CtxProxy(ctx) + result._close_session = lambda *args, **kwargs: {} yield result result.close() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0c986842/tests/orchestrator/workflows/executor/__init__.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/__init__.py b/tests/orchestrator/workflows/executor/__init__.py index 8ad8edb..375c44e 100644 --- a/tests/orchestrator/workflows/executor/__init__.py +++ b/tests/orchestrator/workflows/executor/__init__.py @@ -69,7 +69,7 @@ class MockContext(object): return None @classmethod - def deserialize_from_dict(cls, **kwargs): + def instantiate_from_dict(cls, **kwargs): if kwargs: return cls(storage=aria.application_model_storage(**kwargs)) else:
