PROTON-1394: Python client resource cleanup, circular references and underlying C objects
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/b79759d6 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/b79759d6 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/b79759d6 Branch: refs/heads/go1 Commit: b79759d6dbd2f542086bb6bcb3c94806ed818b5a Parents: b17671e Author: Clifford Jansen <[email protected]> Authored: Fri Jul 28 16:44:18 2017 -0700 Committer: Clifford Jansen <[email protected]> Committed: Fri Jul 28 16:44:18 2017 -0700 ---------------------------------------------------------------------- proton-c/bindings/python/proton/__init__.py | 9 ++++--- proton-c/bindings/python/proton/handlers.py | 8 +++--- proton-c/bindings/python/proton/reactor.py | 31 +++++++++++++++++------- proton-c/bindings/python/proton/utils.py | 21 ++++++++++------ 4 files changed, 45 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b79759d6/proton-c/bindings/python/proton/__init__.py ---------------------------------------------------------------------- diff --git a/proton-c/bindings/python/proton/__init__.py b/proton-c/bindings/python/proton/__init__.py index 2b354df..dca600b 100644 --- a/proton-c/bindings/python/proton/__init__.py +++ b/proton-c/bindings/python/proton/__init__.py @@ -2324,7 +2324,7 @@ class Endpoint(object): from . import reactor ractor = reactor.Reactor.wrap(pn_object_reactor(self._impl)) if ractor: - on_error = ractor.on_error + on_error = ractor.on_error_delegate() else: on_error = None record = self._get_attachments() @@ -2334,7 +2334,7 @@ class Endpoint(object): from . import reactor ractor = reactor.Reactor.wrap(pn_object_reactor(self._impl)) if ractor: - on_error = ractor.on_error + on_error = ractor.on_error_delegate() else: on_error = None impl = _chandler(handler, on_error) @@ -4110,9 +4110,10 @@ class WrappedHandler(Wrapper): else: on_error(info) - def add(self, handler): + def add(self, handler, on_error=None): if handler is None: return - impl = _chandler(handler, self._on_error) + if on_error is None: on_error = self._on_error + impl = _chandler(handler, on_error) pn_handler_add(self._impl, impl) pn_decref(impl) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b79759d6/proton-c/bindings/python/proton/handlers.py ---------------------------------------------------------------------- diff --git a/proton-c/bindings/python/proton/handlers.py b/proton-c/bindings/python/proton/handlers.py index 6d580b7..6d3cce5 100644 --- a/proton-c/bindings/python/proton/handlers.py +++ b/proton-c/bindings/python/proton/handlers.py @@ -16,7 +16,7 @@ # specific language governing permissions and limitations # under the License. # -import heapq, logging, os, re, socket, time, types +import heapq, logging, os, re, socket, time, types, weakref from proton import dispatch, generate_uuid, PN_ACCEPTED, SASL, symbol, ulong, Url from proton import Collector, Connection, Delivery, Described, Endpoint, Event, Link, Terminus, Timeout @@ -390,9 +390,9 @@ class MessagingHandler(Handler, Acking): self.handlers = [] if prefetch: self.handlers.append(CFlowController(prefetch)) - self.handlers.append(EndpointStateHandler(peer_close_is_error, self)) - self.handlers.append(IncomingMessageHandler(auto_accept, self)) - self.handlers.append(OutgoingMessageHandler(auto_settle, self)) + self.handlers.append(EndpointStateHandler(peer_close_is_error, weakref.proxy(self))) + self.handlers.append(IncomingMessageHandler(auto_accept, weakref.proxy(self))) + self.handlers.append(OutgoingMessageHandler(auto_settle, weakref.proxy(self))) self.fatal_conditions = ["amqp:unauthorized-access"] def on_transport_error(self, event): http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b79759d6/proton-c/bindings/python/proton/reactor.py ---------------------------------------------------------------------- diff --git a/proton-c/bindings/python/proton/reactor.py b/proton-c/bindings/python/proton/reactor.py index bffeea1..5f6d8cb 100644 --- a/proton-c/bindings/python/proton/reactor.py +++ b/proton-c/bindings/python/proton/reactor.py @@ -84,20 +84,33 @@ class Reactor(Wrapper): def __init__(self, *handlers, **kwargs): Wrapper.__init__(self, kwargs.get("impl", pn_reactor), pn_reactor_attachments) for h in handlers: - self.handler.add(h) + self.handler.add(h, on_error=self.on_error_delegate()) def _init(self): self.errors = [] + # on_error relay handler tied to underlying C reactor. Use when the + # error will always be generated from a callback from this reactor. + # Needed to prevent reference cycles and be compatible with wrappers. + class ErrorDelegate(object): + def __init__(self, reactor): + self.reactor_impl = reactor._impl + def on_error(self, info): + ractor = Reactor.wrap(self.reactor_impl) + ractor.on_error(info) + + def on_error_delegate(self): + return Reactor.ErrorDelegate(self).on_error + def on_error(self, info): self.errors.append(info) self.yield_() def _get_global(self): - return WrappedHandler.wrap(pn_reactor_get_global_handler(self._impl), self.on_error) + return WrappedHandler.wrap(pn_reactor_get_global_handler(self._impl), self.on_error_delegate()) def _set_global(self, handler): - impl = _chandler(handler, self.on_error) + impl = _chandler(handler, self.on_error_delegate()) pn_reactor_set_global_handler(self._impl, impl) pn_decref(impl) @@ -118,10 +131,10 @@ class Reactor(Wrapper): return pn_reactor_mark(self._impl) def _get_handler(self): - return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), self.on_error) + return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), self.on_error_delegate()) def _set_handler(self, handler): - impl = _chandler(handler, self.on_error) + impl = _chandler(handler, self.on_error_delegate()) pn_reactor_set_handler(self._impl, impl) pn_decref(impl) @@ -164,13 +177,13 @@ class Reactor(Wrapper): self._check_errors() def schedule(self, delay, task): - impl = _chandler(task, self.on_error) + impl = _chandler(task, self.on_error_delegate()) task = Task.wrap(pn_reactor_schedule(self._impl, secs2millis(delay), impl)) pn_decref(impl) return task def acceptor(self, host, port, handler=None): - impl = _chandler(handler, self.on_error) + impl = _chandler(handler, self.on_error_delegate()) aimpl = pn_reactor_acceptor(self._impl, unicode2utf8(host), str(port), impl) pn_decref(impl) if aimpl: @@ -181,7 +194,7 @@ class Reactor(Wrapper): def connection(self, handler=None): """Deprecated: use connection_to_host() instead """ - impl = _chandler(handler, self.on_error) + impl = _chandler(handler, self.on_error_delegate()) result = Connection.wrap(pn_reactor_connection(self._impl, impl)) if impl: pn_decref(impl) return result @@ -215,7 +228,7 @@ class Reactor(Wrapper): return utf82unicode(_url) def selectable(self, handler=None): - impl = _chandler(handler, self.on_error) + impl = _chandler(handler, self.on_error_delegate()) result = Selectable.wrap(pn_reactor_selectable(self._impl)) if impl: record = pn_selectable_attachments(result._impl) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b79759d6/proton-c/bindings/python/proton/utils.py ---------------------------------------------------------------------- diff --git a/proton-c/bindings/python/proton/utils.py b/proton-c/bindings/python/proton/utils.py index 05ef80d..d0679ae 100644 --- a/proton-c/bindings/python/proton/utils.py +++ b/proton-c/bindings/python/proton/utils.py @@ -137,10 +137,10 @@ class BlockingReceiver(BlockingLink): def __del__(self): self.fetcher = None # The next line causes a core dump if the Proton-C reactor finalizes - # first. The self.container reference prevents reactor finalization - # until after it is set to None. - self.link.handler = None - self.container = None + # first. The self.container reference prevents out of order reactor + # finalization. It may not be set if exception in BlockingLink.__init__ + if hasattr(self, "container"): + self.link.handler = None # implicit call to reactor def receive(self, timeout=False): if not self.fetcher: @@ -208,9 +208,16 @@ class BlockingConnection(Handler): self.container.timeout = self.timeout self.container.start() self.url = Url(url).defaults() - self.conn = self.container.connect(url=self.url, handler=self, ssl_domain=ssl_domain, reconnect=False, heartbeat=heartbeat, **kwargs) - self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT), - msg="Opening connection") + self.conn = None + failed = True + try: + self.conn = self.container.connect(url=self.url, handler=self, ssl_domain=ssl_domain, reconnect=False, heartbeat=heartbeat, **kwargs) + self.wait(lambda: not (self.conn.state & Endpoint.REMOTE_UNINIT), + msg="Opening connection") + failed = False + finally: + if failed and self.conn: + self.close() def create_sender(self, address, handler=None, name=None, options=None): return BlockingSender(self, self.container.create_sender(self.conn, address, name=name, handler=handler, options=options)) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
