This is an automated email from the ASF dual-hosted git repository. astitcher pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit 1d6e14f8bb077f584afbe5419aab7ed78d422f9b Author: Andrew Stitcher <[email protected]> AuthorDate: Tue Jan 15 16:41:57 2019 -0500 PROTON-1992: [Python] Remove dependency on Proton Reactor API - Python binding now only uses APIs from Proton Core library. It uses Python APIs to do all IO and uses Proton purely to process the AMQP protocol. - It is very compatible with the existing higher level Python APIs. [In modules proton, proton.reactor, proton.handlers, proton.utils] - Passes the python tests as well as before - Works with Python 2 and Python 3 - Works on Unix and Windows - Runs all the python examples --- python/CMakeLists.txt | 3 +- python/proton/_endpoints.py | 56 ++--- python/proton/_events.py | 250 ++++++++++++-------- python/proton/_handlers.py | 214 +++++++++++++++-- python/proton/_io.py | 138 +++++++++++ python/proton/_message.py | 13 +- python/proton/_reactor.py | 438 ++++++++++++++++++++++------------- python/proton/_reactor_impl.py | 217 ----------------- python/proton/_selectable.py | 93 ++++++++ python/proton/_transport.py | 5 + python/proton/_utils.py | 4 +- python/proton/_wrapper.py | 15 ++ python/tests/proton_tests/handler.py | 2 +- python/tests/proton_tests/reactor.py | 8 +- 14 files changed, 922 insertions(+), 534 deletions(-) diff --git a/python/CMakeLists.txt b/python/CMakeLists.txt index c9e659e..a02c401 100644 --- a/python/CMakeLists.txt +++ b/python/CMakeLists.txt @@ -72,6 +72,7 @@ set (pysrc proton/_endpoints.py proton/_events.py proton/_exceptions.py + proton/_io.py proton/_message.py proton/_transport.py proton/_url.py @@ -83,7 +84,7 @@ set (pysrc proton/_handlers.py proton/_reactor.py - proton/_reactor_impl.py + proton/_selectable.py proton/_utils.py ) # extra files included in the source distribution diff --git a/python/proton/_endpoints.py b/python/proton/_endpoints.py index bf72727..e873710 100644 --- a/python/proton/_endpoints.py +++ b/python/proton/_endpoints.py @@ -27,7 +27,6 @@ import weakref from cproton import PN_LOCAL_UNINIT, PN_REMOTE_UNINIT, PN_LOCAL_ACTIVE, PN_REMOTE_ACTIVE, PN_LOCAL_CLOSED, \ PN_REMOTE_CLOSED, \ - pn_object_reactor, pn_record_get_handler, pn_record_set_handler, pn_decref, \ pn_connection, pn_connection_attachments, pn_connection_transport, pn_connection_error, pn_connection_condition, \ pn_connection_remote_condition, pn_connection_collect, pn_connection_set_container, pn_connection_get_container, \ pn_connection_get_hostname, pn_connection_set_hostname, pn_connection_get_user, pn_connection_set_user, \ @@ -81,6 +80,7 @@ class Endpoint(object): def _init(self): self.condition = None + self._handler = None def _update_cond(self): obj2cond(self.condition, self._get_cond_impl()) @@ -97,35 +97,21 @@ class Endpoint(object): assert False, "Subclass must override this!" def _get_handler(self): - from . import _reactor - from . import _reactor_impl - ractor = _reactor.Reactor.wrap(pn_object_reactor(self._impl)) - if ractor: - on_error = ractor.on_error_delegate() - else: - on_error = None - record = self._get_attachments() - return _reactor_impl.WrappedHandler.wrap(pn_record_get_handler(record), on_error) + return self._handler def _set_handler(self, handler): - from . import _reactor - from . import _reactor_impl - ractor = _reactor.Reactor.wrap(pn_object_reactor(self._impl)) - if ractor: - on_error = ractor.on_error_delegate() + # TODO Hack This is here for some very odd (IMO) backwards compat behaviour + from ._events import Handler + if handler is None: + self._handler = None + elif issubclass(type(handler), Handler): + self._handler = handler else: - on_error = None - impl = _reactor_impl._chandler(handler, on_error) - record = self._get_attachments() - pn_record_set_handler(record, impl) - pn_decref(impl) + self._handler = Handler() + self._handler.add(handler) handler = property(_get_handler, _set_handler) - @property - def transport(self): - return self.connection.transport - class Connection(Wrapper, Endpoint): """ @@ -147,6 +133,8 @@ class Connection(Wrapper, Endpoint): self.offered_capabilities = None self.desired_capabilities = None self.properties = None + self.url = None + self._acceptor = None def _get_attachments(self): return pn_connection_attachments(self._impl) @@ -183,7 +171,7 @@ class Connection(Wrapper, Endpoint): return utf82unicode(pn_connection_get_container(self._impl)) def _set_container(self, name): - return pn_connection_set_container(self._impl, unicode2utf8(name)) + pn_connection_set_container(self._impl, unicode2utf8(name)) container = property(_get_container, _set_container) @@ -191,7 +179,7 @@ class Connection(Wrapper, Endpoint): return utf82unicode(pn_connection_get_hostname(self._impl)) def _set_hostname(self, name): - return pn_connection_set_hostname(self._impl, unicode2utf8(name)) + pn_connection_set_hostname(self._impl, unicode2utf8(name)) hostname = property(_get_hostname, _set_hostname, doc=""" @@ -206,7 +194,7 @@ and SASL layers to identify the peer. return utf82unicode(pn_connection_get_user(self._impl)) def _set_user(self, name): - return pn_connection_set_user(self._impl, unicode2utf8(name)) + pn_connection_set_user(self._impl, unicode2utf8(name)) user = property(_get_user, _set_user) @@ -214,7 +202,7 @@ and SASL layers to identify the peer. return None def _set_password(self, name): - return pn_connection_set_password(self._impl, unicode2utf8(name)) + pn_connection_set_password(self._impl, unicode2utf8(name)) password = property(_get_password, _set_password) @@ -243,6 +231,10 @@ and SASL layers to identify the peer. """The properties specified by the remote peer for this connection.""" return dat2obj(pn_connection_remote_properties(self._impl)) + @property + def connected_address(self): + return self.url and str(self.url) + def open(self): """ Opens the connection. @@ -374,6 +366,10 @@ class Session(Wrapper, Endpoint): def connection(self): return Connection.wrap(pn_session_connection(self._impl)) + @property + def transport(self): + return self.connection.transport + def sender(self, name): return Sender(pn_sender(self._impl, unicode2utf8(name))) @@ -486,6 +482,10 @@ class Link(Wrapper, Endpoint): """The connection on which this link was attached.""" return self.session.connection + @property + def transport(self): + return self.session.transport + def delivery(self, tag): return Delivery(pn_delivery(self._impl, tag)) diff --git a/python/proton/_events.py b/python/proton/_events.py index c6d5459..d322b2e 100644 --- a/python/proton/_events.py +++ b/python/proton/_events.py @@ -22,26 +22,23 @@ from __future__ import absolute_import import threading from cproton import PN_SESSION_REMOTE_CLOSE, PN_SESSION_FINAL, pn_event_context, pn_collector_put, \ - PN_SELECTABLE_UPDATED, pn_collector, PN_CONNECTION_REMOTE_OPEN, pn_event_attachments, pn_event_type, \ - pn_collector_free, pn_handler_dispatch, PN_SELECTABLE_WRITABLE, PN_SELECTABLE_INIT, PN_SESSION_REMOTE_OPEN, \ - pn_collector_peek, PN_CONNECTION_BOUND, PN_LINK_FLOW, pn_event_connection, PN_LINK_LOCAL_CLOSE, \ + pn_collector, PN_CONNECTION_REMOTE_OPEN, pn_event_type, \ + pn_collector_free, pn_collector_release, PN_SESSION_REMOTE_OPEN, \ + pn_collector_peek, pn_collector_more, PN_CONNECTION_BOUND, PN_LINK_FLOW, pn_event_connection, PN_LINK_LOCAL_CLOSE, \ PN_TRANSPORT_ERROR, PN_CONNECTION_LOCAL_OPEN, PN_CONNECTION_LOCAL_CLOSE, pn_event_delivery, \ - PN_LINK_REMOTE_OPEN, PN_TRANSPORT_CLOSED, PN_TRANSPORT_HEAD_CLOSED, PN_TRANSPORT, pn_event_reactor, \ + PN_LINK_REMOTE_OPEN, PN_TRANSPORT_CLOSED, PN_TRANSPORT_HEAD_CLOSED, PN_TRANSPORT, \ PN_CONNECTION_REMOTE_CLOSE, pn_collector_pop, PN_LINK_INIT, pn_event_link, PN_CONNECTION_UNBOUND, \ - pn_event_type_name, pn_event_session, PN_LINK_FINAL, pn_py2void, PN_REACTOR_INIT, PN_REACTOR_QUIESCED, \ - PN_LINK_LOCAL_DETACH, PN_SESSION_INIT, PN_CONNECTION_FINAL, PN_TIMER_TASK, pn_class_name, PN_SELECTABLE_READABLE, \ - pn_event_transport, PN_TRANSPORT_TAIL_CLOSED, PN_SELECTABLE_FINAL, PN_SESSION_LOCAL_OPEN, PN_DELIVERY, \ - PN_SESSION_LOCAL_CLOSE, pn_event_copy, PN_REACTOR_FINAL, PN_LINK_LOCAL_OPEN, PN_SELECTABLE_EXPIRED, \ - PN_LINK_REMOTE_DETACH, PN_PYREF, PN_LINK_REMOTE_CLOSE, pn_event_root, PN_SELECTABLE_ERROR, \ + pn_event_type_name, pn_event_session, PN_LINK_FINAL, pn_py2void, \ + PN_LINK_LOCAL_DETACH, PN_SESSION_INIT, PN_CONNECTION_FINAL, PN_TIMER_TASK, pn_class_name,\ + pn_event_transport, PN_TRANSPORT_TAIL_CLOSED,PN_SESSION_LOCAL_OPEN, PN_DELIVERY, \ + PN_SESSION_LOCAL_CLOSE, PN_LINK_LOCAL_OPEN, \ + PN_LINK_REMOTE_DETACH, PN_PYREF, PN_LINK_REMOTE_CLOSE, \ PN_CONNECTION_INIT, pn_event_class, pn_void2py, pn_cast_pn_session, pn_cast_pn_link, pn_cast_pn_delivery, \ - pn_cast_pn_transport, pn_cast_pn_connection, pn_cast_pn_selectable + pn_cast_pn_transport, pn_cast_pn_connection -from ._common import Constant from ._delivery import Delivery from ._endpoints import Connection, Session, Link -from ._reactor_impl import Selectable, WrappedHandler from ._transport import Transport -from ._wrapper import Wrapper class Collector: @@ -55,10 +52,16 @@ class Collector: def peek(self): return Event.wrap(pn_collector_peek(self._impl)) + def more(self): + return pn_collector_more(self._impl) + def pop(self): ev = self.peek() pn_collector_pop(self._impl) + def release(self): + pn_collector_release(self._impl) + def __del__(self): pn_collector_free(self._impl) del self._impl @@ -104,38 +107,52 @@ class EventType(object): self._lock.release() def __repr__(self): + return "EventType(name=%s, number=%d)" % (self.name, self.number) + + def __str__(self): return self.name def _dispatch(handler, method, *args): m = getattr(handler, method, None) if m: - return m(*args) + m(*args) elif hasattr(handler, "on_unhandled"): - return handler.on_unhandled(method, *args) + handler.on_unhandled(method, *args) class EventBase(object): - def __init__(self, clazz, context, type): - self.clazz = clazz - self.context = context - self.type = type - - def dispatch(self, handler): - return _dispatch(handler, self.type.method, self) + def __init__(self, type): + self._type = type + @property + def type(self): + return self._type -def _none(x): return None + @property + def handler(self): + return None + def dispatch(self, handler, type=None): + type = type or self._type + _dispatch(handler, type.method, self) + if hasattr(handler, "handlers"): + for h in handler.handlers: + self.dispatch(h, type) -DELEGATED = Constant("DELEGATED") + def __repr__(self): + return "%s(%r)" % (self._type, self.context) def _core(number, method): return EventType(number=number, method=method) +def _internal(name): + return EventType(name=name) + + wrappers = { "pn_void": lambda x: pn_void2py(x), "pn_pyref": lambda x: pn_void2py(x), @@ -143,16 +160,11 @@ wrappers = { "pn_session": lambda x: Session.wrap(pn_cast_pn_session(x)), "pn_link": lambda x: Link.wrap(pn_cast_pn_link(x)), "pn_delivery": lambda x: Delivery.wrap(pn_cast_pn_delivery(x)), - "pn_transport": lambda x: Transport.wrap(pn_cast_pn_transport(x)), - "pn_selectable": lambda x: Selectable.wrap(pn_cast_pn_selectable(x)) + "pn_transport": lambda x: Transport.wrap(pn_cast_pn_transport(x)) } -class Event(Wrapper, EventBase): - REACTOR_INIT = _core(PN_REACTOR_INIT, "on_reactor_init") - REACTOR_QUIESCED = _core(PN_REACTOR_QUIESCED, "on_reactor_quiesced") - REACTOR_FINAL = _core(PN_REACTOR_FINAL, "on_reactor_final") - +class Event(EventBase): TIMER_TASK = _core(PN_TIMER_TASK, "on_timer_task") CONNECTION_INIT = _core(PN_CONNECTION_INIT, "on_connection_init") @@ -189,107 +201,159 @@ class Event(Wrapper, EventBase): TRANSPORT_TAIL_CLOSED = _core(PN_TRANSPORT_TAIL_CLOSED, "on_transport_tail_closed") TRANSPORT_CLOSED = _core(PN_TRANSPORT_CLOSED, "on_transport_closed") - SELECTABLE_INIT = _core(PN_SELECTABLE_INIT, "on_selectable_init") - SELECTABLE_UPDATED = _core(PN_SELECTABLE_UPDATED, "on_selectable_updated") - SELECTABLE_READABLE = _core(PN_SELECTABLE_READABLE, "on_selectable_readable") - SELECTABLE_WRITABLE = _core(PN_SELECTABLE_WRITABLE, "on_selectable_writable") - SELECTABLE_EXPIRED = _core(PN_SELECTABLE_EXPIRED, "on_selectable_expired") - SELECTABLE_ERROR = _core(PN_SELECTABLE_ERROR, "on_selectable_error") - SELECTABLE_FINAL = _core(PN_SELECTABLE_FINAL, "on_selectable_final") + # These events are now internal events in the python code + REACTOR_INIT = _internal("reactor_init") + REACTOR_QUIESCED = _internal("reactor_quiesced") + REACTOR_FINAL = _internal("reactor_final") + + SELECTABLE_INIT = _internal("selectable_init") + SELECTABLE_UPDATED = _internal("selectable_updated") + SELECTABLE_READABLE = _internal("selectable_readable") + SELECTABLE_WRITABLE = _internal("selectable_writable") + SELECTABLE_EXPIRED = _internal("selectable_expired") + SELECTABLE_ERROR = _internal("selectable_error") + SELECTABLE_FINAL = _internal("selectable_final") @staticmethod - def wrap(impl, number=None): + def wrap(impl): if impl is None: return None - if number is None: - number = pn_event_type(impl) + number = pn_event_type(impl) + cls = pn_event_class(impl) - event = Event(impl, number) + if cls: + clsname = pn_class_name(cls) + context = wrappers[clsname](pn_event_context(impl)) - # check for an application defined ApplicationEvent and return that. This - # avoids an expensive wrap operation invoked by event.context - if pn_event_class(impl) == PN_PYREF and \ - isinstance(event.context, EventBase): - return event.context + # check for an application defined ApplicationEvent and return that. This + # avoids an expensive wrap operation invoked by event.context + if cls == PN_PYREF and isinstance(context, EventBase): + return context else: - return event + clsname = None - def __init__(self, impl, number): - Wrapper.__init__(self, impl, pn_event_attachments) - self.__dict__["type"] = EventType.TYPES[number] + event = Event(impl, number, clsname, context) + return event - def _init(self): - pass + def __init__(self, impl, number, clsname, context): + self._type = EventType.TYPES[number] + self._clsname = clsname + self._context = context - def copy(self): - copy = pn_event_copy(self._impl) - return Event.wrap(copy) + # Do all this messing around to avoid duplicate wrappers + if issubclass(type(context), Delivery): + self._delivery = context + else: + self._delivery = Delivery.wrap(pn_event_delivery(impl)) + if self._delivery: + self._link = self._delivery.link + elif issubclass(type(context), Link): + self._link = context + else: + self._link = Link.wrap(pn_event_link(impl)) + if self._link: + self._session = self._link.session + elif issubclass(type(context), Session): + self._session = context + else: + self._session = Session.wrap(pn_event_session(impl)) + if self._session: + self._connection = self._session.connection + elif issubclass(type(context), Connection): + self._connection = context + else: + self._connection = Connection.wrap(pn_event_connection(impl)) - @property - def clazz(self): - cls = pn_event_class(self._impl) - if cls: - return pn_class_name(cls) + if issubclass(type(context), Transport): + self._transport = context else: - return None + self._transport = Transport.wrap(pn_event_transport(impl)) @property - def root(self): - return WrappedHandler.wrap(pn_event_root(self._impl)) + def clazz(self): + return self._clsname @property def context(self): - """Returns the context object associated with the event. The type of this depend on the type of event.""" - return wrappers[self.clazz](pn_event_context(self._impl)) + """Returns the context object associated with the event. The type of this depends on the type of event.""" + return self._context - def dispatch(self, handler, type=None): - type = type or self.type - if isinstance(handler, WrappedHandler): - pn_handler_dispatch(handler._impl, self._impl, type.number) - else: - result = _dispatch(handler, type.method, self) - if result != DELEGATED and hasattr(handler, "handlers"): - for h in handler.handlers: - self.dispatch(h, type) + @property + def handler(self): + l = self.link + if l: + h = l.handler + if h: + return h + s = self.session + if s: + h = s.handler + if h: + return h + c = self.connection + if c: + h = c.handler + if h: + return h + c = self.context + if not c or not hasattr(c, 'handler'): + return None + h = c.handler + return h @property def reactor(self): - """Returns the reactor associated with the event.""" - return wrappers.get("pn_reactor", _none)(pn_event_reactor(self._impl)) + """ + Deprecated: Returns the container (was reactor) associated with the event. + """ + return self.container + + @property + def container(self): + """ + Returns the container associated with the event. + """ + return self._transport._reactor def __getattr__(self, name): - r = self.reactor - if r and hasattr(r, 'subclass') and r.subclass.__name__.lower() == name: - return r - else: - return super(Event, self).__getattr__(name) + """ + This will look for a property of the event as an attached context object of the same + type as the property (but lowercase) + """ + c = self.context + # Direct type or subclass of type + if type(c).__name__.lower() == name or name in [x.__name__.lower() for x in type(c).__bases__]: + return c + + # If the attached object is the wrong type then see if *it* has a property of that name + return getattr(c, name, None) @property def transport(self): """Returns the transport associated with the event, or null if none is associated with it.""" - return Transport.wrap(pn_event_transport(self._impl)) + return self._transport @property def connection(self): """Returns the connection associated with the event, or null if none is associated with it.""" - return Connection.wrap(pn_event_connection(self._impl)) + return self._connection @property def session(self): """Returns the session associated with the event, or null if none is associated with it.""" - return Session.wrap(pn_event_session(self._impl)) + return self._session @property def link(self): """Returns the link associated with the event, or null if none is associated with it.""" - return Link.wrap(pn_event_link(self._impl)) + return self._link @property def sender(self): """Returns the sender link associated with the event, or null if none is associated with it. This is essentially an alias for - link(), that does an additional checkon the type of the + link(), that does an additional check on the type of the link.""" l = self.link if l and l.is_sender: @@ -301,7 +365,7 @@ class Event(Wrapper, EventBase): def receiver(self): """Returns the receiver link associated with the event, or null if none is associated with it. This is essentially an alias for - link(), that does an additional checkon the type of the link.""" + link(), that does an additional check on the type of the link.""" l = self.link if l and l.is_receiver: return l @@ -311,10 +375,7 @@ class Event(Wrapper, EventBase): @property def delivery(self): """Returns the delivery associated with the event, or null if none is associated with it.""" - return Delivery.wrap(pn_event_delivery(self._impl)) - - def __repr__(self): - return "%s(%s)" % (self.type, self.context) + return self._delivery class LazyHandlers(object): @@ -329,5 +390,10 @@ class LazyHandlers(object): class Handler(object): handlers = LazyHandlers() + # TODO What to do with on_error? + def add(self, handler, on_error=None): + """Add a child handler""" + self.handlers.append(handler) + def on_unhandled(self, method, *args): pass diff --git a/python/proton/_handlers.py b/python/proton/_handlers.py index f8d5413..c946a3d 100644 --- a/python/proton/_handlers.py +++ b/python/proton/_handlers.py @@ -22,13 +22,15 @@ from __future__ import absolute_import import logging import time import weakref -from select import select from ._delivery import Delivery from ._endpoints import Endpoint -from ._message import Message -from ._exceptions import ProtonException from ._events import Handler, _dispatch +from ._exceptions import ProtonException +from ._io import IO +from ._message import Message +from ._transport import Transport +from ._url import Url log = logging.getLogger("proton") @@ -672,15 +674,6 @@ CFlowController = FlowController CHandshaker = Handshaker -from ._reactor_impl import WrappedHandler -from cproton import pn_iohandler - -class IOHandler(WrappedHandler): - - def __init__(self): - WrappedHandler.__init__(self, pn_iohandler) - - class PythonIO: def __init__(self): @@ -726,13 +719,11 @@ class PythonIO: timeout = deadline - time.time() else: timeout = reactor.timeout - if (timeout < 0): timeout = 0 + if timeout < 0: timeout = 0 timeout = min(timeout, reactor.timeout) - readable, writable, _ = select(reading, writing, [], timeout) + readable, writable, _ = IO.select(reading, writing, [], timeout) - reactor.mark() - - now = time.time() + now = reactor.mark() for s in readable: s.readable() @@ -743,3 +734,192 @@ class PythonIO: s.expired() reactor.yield_() + + +# For C style IO handler need to implement Selector +class IOHandler(Handler): + + def __init__(self): + self._selector = IO.Selector() + + def on_selectable_init(self, event): + s = event.selectable + self._selector.add(s) + s._reactor._selectables += 1 + + def on_selectable_updated(self, event): + s = event.selectable + self._selector.update(s) + + def on_selectable_final(self, event): + s = event.selectable + self._selector.remove(s) + s._reactor._selectables -= 1 + s.release() + + def on_reactor_quiesced(self, event): + r = event.reactor + + if not r.quiesced: + return + + d = r.timer_deadline + readable, writable, expired = self._selector.select(r.timeout) + + now = r.mark() + + for s in readable: + s.readable() + for s in writable: + s.writable() + for s in expired: + s.expired() + + r.yield_() + + def on_selectable_readable(self, event): + s = event.selectable + t = s._transport + + # If we're an acceptor we can't have a transport + # and we don't want to do anything here in any case + if not t: + return + + capacity = t.capacity() + if capacity > 0: + try: + b = s.recv(capacity) + if len(b) > 0: + n = t.push(b) + else: + # EOF handling + self.on_selectable_error(event) + except: + # TODO: What's the error handling to be here? + t.close_tail() + + # Always update as we may have gone to not reading or from + # not writing to writing when processing the incoming bytes + r = s._reactor + self.update(t, s, r.now) + + def on_selectable_writable(self, event): + s = event.selectable + t = s._transport + + # If we're an acceptor we can't have a transport + # and we don't want to do anything here in any case + if not t: + return + + pending = t.pending() + if pending > 0: + + try: + n = s.send(t.peek(pending)) + t.pop(n) + except: + # TODO: Error? or actually an exception + t.close_head() + + newpending = t.pending() + if newpending != pending: + r = s._reactor + self.update(t, s, r.now) + + def on_selectable_error(self, event): + s = event.selectable + t = s._transport + + t.close_head() + t.close_tail() + s.terminate() + s.update() + + def on_selectable_expired(self, event): + s = event.selectable + t = s._transport + r = s._reactor + + self.update(t, s, r.now) + + def on_connection_local_open(self, event): + c = event.connection + if not c.state & Endpoint.REMOTE_UNINIT: + return + + t = Transport() + # It seems perverse, but the C code ignores bind errors too! + # and this is required or you get errors because Connector() has already + # bound the transport and connection! + t.bind_nothrow(c) + + def on_connection_bound(self, event): + c = event.connection + t = event.transport + + reactor = c._reactor + + # link the new transport to its reactor: + t._reactor = reactor + + if c._acceptor: + # this connection was created by the acceptor. There is already a + # socket assigned to this connection. Nothing needs to be done. + return + + url = c.url or Url(c.hostname) + url.defaults() + + host = url.host + port = url.port + + if not c.user: + user = url.username + if user: + c.user = user + password = url.password + if password: + c.password = password + + # TODO Currently this is synch and will throw if it cannot connect + # do we want to handle errors differently? or do it asynch? + sock = IO.connect(host, int(port)) + + s = reactor.selectable(delegate=sock) + s._transport = t + t._selectable = s + self.update(t, s, reactor.now) + + @staticmethod + def update(transport, selectable, now): + try: + capacity = transport.capacity() + selectable.reading = capacity>0 + except: + if transport.closed: + selectable.terminate() + try: + pending = transport.pending() + selectable.writing = pending>0 + except: + if transport.closed: + selectable.terminate() + selectable.deadline = transport.tick(now) + selectable.update() + + def on_transport(self, event): + t = event.transport + r = t._reactor + s = t._selectable + if s and not s.is_terminal: + self.update(t, s, r.now) + + def on_transport_closed(self, event): + t = event.transport + r = t._reactor + s = t._selectable + s.terminate() + r.update(s) + t.unbind() diff --git a/python/proton/_io.py b/python/proton/_io.py new file mode 100644 index 0000000..401ba11 --- /dev/null +++ b/python/proton/_io.py @@ -0,0 +1,138 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from __future__ import absolute_import + +import socket +import select +import time + +PN_INVALID_SOCKET = -1 + +class IO(object): + + @staticmethod + def close(s): + s.close() + + @staticmethod + def listen(host, port): + s = socket.socket() + s.bind((host, port)) + s.listen(10) + return s + + @staticmethod + def accept(s): + return s.accept() + + @staticmethod + def connect(host, port): + return socket.create_connection((host, port)) + + @staticmethod + def select(*args, **kwargs): + return select.select(*args, **kwargs) + + @staticmethod + def sleep(t): + time.sleep(t) + return + + class Selector(object): + + def __init__(self): + self._selectables = set() + self._reading = set() + self._writing = set() + self._deadline = None + + def add(self, selectable): + self._selectables.add(selectable) + if selectable.reading: + self._reading.add(selectable) + if selectable.writing: + self._writing.add(selectable) + if selectable.deadline: + if self._deadline is None: + self._deadline = selectable.deadline + else: + self._deadline = min(selectable.deadline, self._deadline) + + def remove(self, selectable): + self._selectables.discard(selectable) + self._reading.discard(selectable) + self._writing.discard(selectable) + self.update_deadline() + + @property + def selectables(self): + return len(self._selectables) + + def update_deadline(self): + for sel in self._selectables: + if sel.deadline: + if self._deadline is None: + self._deadline = sel.deadline + else: + self._deadline = min(sel.deadline, self._deadline) + + def update(self, selectable): + self._reading.discard(selectable) + self._writing.discard(selectable) + if selectable.reading: + self._reading.add(selectable) + if selectable.writing: + self._writing.add(selectable) + self.update_deadline() + + def select(self, timeout): + + def select_inner(timeout): + r = self._reading + w = self._writing + + now = time.time() + + # No timeout or deadline + if timeout is None and self._deadline is None: + return IO.select(r, w, []) + + if timeout is None: + t = max(0, self._deadline - now) + return IO.select(r, w, [], t) + + if self._deadline is None: + return IO.select(r, w, [], timeout) + + t = max(0, min(timeout, self._deadline - now)) + if len(r)==0 and len(w)==0: + if t > 0: IO.sleep(t) + return ([],[],[]) + + return IO.select(r, w, [], t) + + r, w, _ = select_inner(timeout) + + # Calculate timed out selectables + now = time.time() + t = [s for s in self._selectables if s.deadline and now > s.deadline] + self._deadline = None + self.update_deadline() + return r, w, t diff --git a/python/proton/_message.py b/python/proton/_message.py index cca498f..2f3709a 100644 --- a/python/proton/_message.py +++ b/python/proton/_message.py @@ -39,7 +39,7 @@ from cproton import PN_DEFAULT_PRIORITY, PN_OVERFLOW, \ pn_inspect, pn_string, pn_string_get, pn_free, pn_error_text from . import _compat -from ._common import Constant, isinteger, secs2millis, millis2secs, unicode2utf8, utf82unicode +from ._common import isinteger, secs2millis, millis2secs, unicode2utf8, utf82unicode from ._data import Data, ulong, symbol from ._endpoints import Link from ._exceptions import EXCEPTIONS, MessageException @@ -52,7 +52,6 @@ except NameError: unicode = str - class Message(object): """The L{Message} class is a mutable holder of message content. @@ -432,7 +431,7 @@ The group-id for any replies. self.decode(dlv.encoded) return dlv - def __repr2__(self): + def __repr__(self): props = [] for attr in ("inferred", "address", "reply_to", "durable", "ttl", "priority", "first_acquirer", "delivery_count", "id", @@ -442,11 +441,3 @@ The group-id for any replies. value = getattr(self, attr) if value: props.append("%s=%r" % (attr, value)) return "Message(%s)" % ", ".join(props) - - def __repr__(self): - tmp = pn_string(None) - err = pn_inspect(self._msg, tmp) - result = pn_string_get(tmp) - pn_free(tmp) - self._check(err) - return result diff --git a/python/proton/_reactor.py b/python/proton/_reactor.py index a47625f..6cf5305 100644 --- a/python/proton/_reactor.py +++ b/python/proton/_reactor.py @@ -19,21 +19,16 @@ from __future__ import absolute_import +from functools import total_ordering +import heapq import json -import os import logging +import os +import time import traceback import uuid -from cproton import PN_MILLIS_MAX, PN_PYREF, PN_ACCEPTED, \ - pn_reactor_stop, pn_selectable_attachments, pn_reactor_quiesced, pn_reactor_acceptor, \ - pn_record_set_handler, pn_collector_put, pn_reactor_get_timeout, pn_task_cancel, pn_acceptor_set_ssl_domain, \ - pn_record_get, pn_reactor_selectable, pn_task_attachments, pn_reactor_schedule, pn_acceptor_close, pn_py2void, \ - pn_reactor_error, pn_reactor_attachments, pn_reactor_get_global_handler, pn_reactor_process, pn_reactor, \ - pn_reactor_set_handler, pn_reactor_set_global_handler, pn_reactor_yield, pn_error_text, pn_reactor_connection, \ - pn_cast_pn_reactor, pn_reactor_get_connection_address, pn_reactor_update, pn_reactor_collector, pn_void2py, \ - pn_reactor_start, pn_reactor_set_connection_host, pn_cast_pn_task, pn_decref, pn_reactor_set_timeout, \ - pn_reactor_mark, pn_reactor_get_handler, pn_reactor_wakeup +from cproton import PN_PYREF, PN_ACCEPTED, PN_EVENT_NONE from ._delivery import Delivery from ._endpoints import Connection, Endpoint, Link, Session, Terminus @@ -42,159 +37,175 @@ from ._data import Described, symbol, ulong from ._message import Message from ._transport import Transport, SSL, SSLDomain from ._url import Url -from ._common import isstring, secs2millis, millis2secs, unicode2utf8, utf82unicode -from ._events import EventType, EventBase, Handler -from ._reactor_impl import Selectable, WrappedHandler, _chandler -from ._wrapper import Wrapper, PYCTX +from ._common import isstring, unicode2utf8, utf82unicode +from ._events import Collector, EventType, EventBase, Handler, Event +from ._selectable import Selectable -from ._handlers import OutgoingMessageHandler +from ._handlers import OutgoingMessageHandler, IOHandler + +from ._io import IO, PN_INVALID_SOCKET from . import _compat from ._compat import queue -Logger = logging.getLogger("proton") + +_logger = logging.getLogger("proton") def _generate_uuid(): return uuid.uuid4() -def _timeout2millis(secs): - if secs is None: return PN_MILLIS_MAX - return secs2millis(secs) +def _now(): + return time.time() +@total_ordering +class Task(object): -def _millis2timeout(millis): - if millis == PN_MILLIS_MAX: return None - return millis2secs(millis) - - -class Task(Wrapper): - - @staticmethod - def wrap(impl): - if impl is None: - return None - else: - return Task(impl) + def __init__(self, reactor, deadline, handler): + self._deadline = deadline + self._handler = handler + self._reactor = reactor + self._cancelled = False - def __init__(self, impl): - Wrapper.__init__(self, impl, pn_task_attachments) - - def _init(self): - pass + def __lt__(self, rhs): + return self._deadline < rhs._deadline def cancel(self): - pn_task_cancel(self._impl) + self._cancelled = True + @property + def handler(self): + return self._handler -class Acceptor(Wrapper): +class TimerSelectable(Selectable): - def __init__(self, impl): - Wrapper.__init__(self, impl) + def __init__(self, reactor, collector): + super(TimerSelectable, self).__init__(None, reactor) + self.collect(collector) + collector.put(self, Event.SELECTABLE_INIT) - def set_ssl_domain(self, ssl_domain): - pn_acceptor_set_ssl_domain(self._impl, ssl_domain._domain) + def fileno(self): + return PN_INVALID_SOCKET - def close(self): - pn_acceptor_close(self._impl) + def readable(self): + pass + def writable(self): + pass -class Reactor(Wrapper): + def expired(self): + self._reactor.timer_tick() + self.deadline = self._reactor.timer_deadline + self.update() - @staticmethod - def wrap(impl): - if impl is None: - return None - else: - record = pn_reactor_attachments(impl) - attrs = pn_void2py(pn_record_get(record, PYCTX)) - if attrs and 'subclass' in attrs: - return attrs['subclass'](impl=impl) - else: - return Reactor(impl=impl) +class Reactor(object): def __init__(self, *handlers, **kwargs): - Wrapper.__init__(self, kwargs.get("impl", pn_reactor), pn_reactor_attachments) - for h in handlers: - self.handler.add(h, on_error=self.on_error_delegate()) - - def _init(self): + self._previous = PN_EVENT_NONE + self._timeout = 0 + self.mark() + self._yield = False + self._stop = False + self._collector = Collector() + self._selectable = None + self._selectables = 0 + self._global_handler = IOHandler() + self._handler = Handler() + self._timerheap = [] + self._timers = 0 self.errors = [] - - # on_error relay handler tied to underlying C reactor. Use when the - # error will always be generated from a callback from this reactor. - # Needed to prevent reference cycles and be compatible with wrappers. - class ErrorDelegate(object): - def __init__(self, reactor): - self.reactor_impl = reactor._impl - - def on_error(self, info): - ractor = Reactor.wrap(self.reactor_impl) - ractor.on_error(info) - - def on_error_delegate(self): - return Reactor.ErrorDelegate(self).on_error + for h in handlers: + self.handler.add(h, on_error=self.on_error) def on_error(self, info): self.errors.append(info) self.yield_() + # TODO: need to make this actually return a proxy which catches exceptions and calls + # on error. + # [Or arrange another way to deal with exceptions thrown by handlers] + def _make_handler(self, handler): + """ + Return a proxy handler that dispatches to the provided handler. + + If handler throws an exception then on_error is called with info + """ + return handler + def _get_global(self): - return WrappedHandler.wrap(pn_reactor_get_global_handler(self._impl), self.on_error_delegate()) + return self._global_handler def _set_global(self, handler): - impl = _chandler(handler, self.on_error_delegate()) - pn_reactor_set_global_handler(self._impl, impl) - pn_decref(impl) + self._global_handler = self._make_handler(handler) global_handler = property(_get_global, _set_global) def _get_timeout(self): - return _millis2timeout(pn_reactor_get_timeout(self._impl)) + return self._timeout def _set_timeout(self, secs): - return pn_reactor_set_timeout(self._impl, _timeout2millis(secs)) + self._timeout = secs timeout = property(_get_timeout, _set_timeout) def yield_(self): - pn_reactor_yield(self._impl) + self._yield = True def mark(self): - return pn_reactor_mark(self._impl) + """ This sets the reactor now instant to the current time """ + self._now = _now() + return self._now + + @property + def now(self): + return self._now def _get_handler(self): - return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), self.on_error_delegate()) + return self._handler def _set_handler(self, handler): - impl = _chandler(handler, self.on_error_delegate()) - pn_reactor_set_handler(self._impl, impl) - pn_decref(impl) + self._handler = self._make_handler(handler) handler = property(_get_handler, _set_handler) def run(self): + # TODO: Why do we timeout like this? self.timeout = 3.14159265359 self.start() while self.process(): pass self.stop() self.process() - self.global_handler = None - self.handler = None + # TODO: This isn't correct if we ever run again + self._global_handler = None + self._handler = None + # Cross thread reactor wakeup def wakeup(self): - n = pn_reactor_wakeup(self._impl) - if n: raise IOError(pn_error_text(pn_reactor_error(self._impl))) + # TODO: Do this with pipe and write? + #os.write(self._wakeup[1], "x", 1); + pass def start(self): - pn_reactor_start(self._impl) + self.push_event(self, Event.REACTOR_INIT) + self._selectable = TimerSelectable(self, self._collector) + self._selectable.deadline = self.timer_deadline + # TODO set up fd to read for wakeups - but problematic on windows + #self._selectable.fileno(self._wakeup[0]) + #self._selectable.reading = True + self.update(self._selectable) @property def quiesced(self): - return pn_reactor_quiesced(self._impl) + event = self._collector.peek() + if not event: + return True + if self._collector.more(): + return False + return event.type is Event.REACTOR_QUIESCED def _check_errors(self): + """ This """ if self.errors: for exc, value, tb in self.errors[:-1]: traceback.print_exception(exc, value, tb) @@ -202,35 +213,104 @@ class Reactor(Wrapper): _compat.raise_(exc, value, tb) def process(self): - result = pn_reactor_process(self._impl) - self._check_errors() - return result + # result = pn_reactor_process(self._impl) + # self._check_errors() + # return result + self.mark() + previous = PN_EVENT_NONE + while True: + if self._yield: + self._yield = False + _logger.debug('%s Yielding', self) + return True + event = self._collector.peek() + if event: + _logger.debug('%s recvd Event: %r', self, event) + type = event.type + + # regular handler + handler = event.handler or self._handler + event.dispatch(handler) + + event.dispatch(self._global_handler) + + previous = type + self._previous = type + self._collector.pop() + elif not self._stop and (self._timers > 0 or self._selectables > 1): + if previous is not Event.REACTOR_QUIESCED and self._previous is not Event.REACTOR_FINAL: + self.push_event(self, Event.REACTOR_QUIESCED) + self.yield_() + else: + if self._selectable: + self._selectable.terminate() + self.update(self._selectable) + self._selectable = None + else: + if self._previous is not Event.REACTOR_FINAL: + self.push_event(self, Event.REACTOR_FINAL) + _logger.debug('%s Stopping', self) + return False def stop(self): - pn_reactor_stop(self._impl) + self._stop = True self._check_errors() - def schedule(self, delay, task): - impl = _chandler(task, self.on_error_delegate()) - task = Task.wrap(pn_reactor_schedule(self._impl, secs2millis(delay), impl)) - pn_decref(impl) + def stop_events(self): + self._collector.release() + + def schedule(self, delay, handler): + himpl = self._make_handler(handler) + task = Task(self, self._now+delay, himpl) + heapq.heappush(self._timerheap, task) + self._timers += 1 + deadline = self._timerheap[0]._deadline + if self._selectable: + self._selectable.deadline = deadline + self.update(self._selectable) return task + def timer_tick(self): + while self._timers > 0: + t = self._timerheap[0] + if t._cancelled: + heapq.heappop(self._timerheap) + self._timers -= 1 + elif t._deadline > self._now: + return + else: + heapq.heappop(self._timerheap) + self._timers -= 1 + self.push_event(t, Event.TIMER_TASK) + + @property + def timer_deadline(self): + while self._timers > 0: + t = self._timerheap[0] + if t._cancelled: + heapq.heappop(self._timerheap) + self._timers -= 1 + else: + return t._deadline + return None + def acceptor(self, host, port, handler=None): - impl = _chandler(handler, self.on_error_delegate()) - aimpl = pn_reactor_acceptor(self._impl, unicode2utf8(host), str(port), impl) - pn_decref(impl) - if aimpl: - return Acceptor(aimpl) + impl = self._make_handler(handler) + a = Acceptor(self, unicode2utf8(host), int(port), impl) + if a: + return a else: - raise IOError("%s (%s:%s)" % (pn_error_text(pn_reactor_error(self._impl)), host, port)) + raise IOError("%s (%s:%s)" % (str(self.errors), host, port)) def connection(self, handler=None): """Deprecated: use connection_to_host() instead """ - impl = _chandler(handler, self.on_error_delegate()) - result = Connection.wrap(pn_reactor_connection(self._impl, impl)) - if impl: pn_decref(impl) + impl = self._make_handler(handler) + result = Connection() + if impl: + result.handler = impl + result._reactor = self + result.collect(self._collector) return result def connection_to_host(self, host, port, handler=None): @@ -247,10 +327,7 @@ class Reactor(Wrapper): used by the reactor's iohandler to create an outgoing socket connection. This must be set prior to opening the connection. """ - pn_reactor_set_connection_host(self._impl, - connection._impl, - unicode2utf8(str(host)), - unicode2utf8(str(port))) + connection.set_address(host, port) def get_connection_address(self, connection): """This may be used to retrieve the remote peer address. @@ -258,29 +335,23 @@ class Reactor(Wrapper): address is available. Use the proton.Url class to create a Url object from the returned value. """ - _url = pn_reactor_get_connection_address(self._impl, connection._impl) + _url = connection.get_address() return utf82unicode(_url) - def selectable(self, handler=None): - impl = _chandler(handler, self.on_error_delegate()) - result = Selectable.wrap(pn_reactor_selectable(self._impl)) - if impl: - record = pn_selectable_attachments(result._impl) - pn_record_set_handler(record, impl) - pn_decref(impl) + def selectable(self, handler=None, delegate=None): + if delegate is None: + delegate = handler + result = Selectable(delegate, self) + result.collect(self._collector) + result.handler = handler + self.push_event(result, Event.SELECTABLE_INIT) return result - def update(self, sel): - pn_reactor_update(self._impl, sel._impl) + def update(self, selectable): + selectable.update() def push_event(self, obj, etype): - pn_collector_put(pn_reactor_collector(self._impl), PN_PYREF, pn_py2void(obj), etype.number) - - -from ._events import wrappers as _wrappers - -_wrappers["pn_reactor"] = lambda x: Reactor.wrap(pn_cast_pn_reactor(x)) -_wrappers["pn_task"] = lambda x: Task.wrap(pn_cast_pn_task(x)) + self._collector.put(obj, etype) class EventInjector(object): @@ -296,6 +367,7 @@ class EventInjector(object): def __init__(self): self.queue = queue.Queue() self.pipe = os.pipe() + self._transport = None self._closed = False def trigger(self, event): @@ -320,19 +392,19 @@ class EventInjector(object): def on_selectable_init(self, event): sel = event.context - sel.fileno(self.fileno()) + #sel.fileno(self.fileno()) sel.reading = True - event.reactor.update(sel) + sel.update() def on_selectable_readable(self, event): + s = event.context os.read(self.pipe[0], 512) while not self.queue.empty(): requested = self.queue.get() - event.reactor.push_event(requested.context, requested.type) + s.push_event(requested.context, requested.type) if self._closed: - s = event.context s.terminate() - event.reactor.update(s) + s.update() class ApplicationEvent(EventBase): @@ -342,7 +414,8 @@ class ApplicationEvent(EventBase): """ def __init__(self, typename, connection=None, session=None, link=None, delivery=None, subject=None): - super(ApplicationEvent, self).__init__(PN_PYREF, self, EventType(typename)) + super(ApplicationEvent, self).__init__(EventType(typename)) + self.clazz = PN_PYREF self.connection = connection self.session = session self.link = link @@ -355,6 +428,10 @@ class ApplicationEvent(EventBase): self.connection = self.session.connection self.subject = subject + @property + def context(self): + return self + def __repr__(self): objects = [self.connection, self.session, self.link, self.delivery, self.subject] return "%s(%s)" % (self.type, ", ".join([str(o) for o in objects if o is not None])) @@ -429,7 +506,7 @@ class Transaction(object): elif event.delivery.remote_state == Delivery.REJECTED: self.handler.on_transaction_declare_failed(event) else: - Logger.warning("Unexpected outcome for declare: %s" % event.delivery.remote_state) + _logger.warning("Unexpected outcome for declare: %s" % event.delivery.remote_state) self.handler.on_transaction_declare_failed(event) elif event.delivery == self._discharge: if event.delivery.remote_state == Delivery.REJECTED: @@ -569,7 +646,7 @@ class SessionPerConnection(object): return self._default_session -class GlobalOverrides(object): +class GlobalOverrides(Handler): """ Internal handler that triggers the necessary socket connect for an opened connection. @@ -587,6 +664,49 @@ class GlobalOverrides(object): return conn and hasattr(conn, '_overrides') and event.dispatch(conn._overrides) +class Acceptor(Handler): + + def __init__(self, reactor, host, port, handler=None): + self._ssl_domain = None + self._reactor = reactor + self._handler = handler + sock = IO.listen(host, port) + s = reactor.selectable(handler=self, delegate=sock) + s.reading = True + s._transport = None + self._selectable = s + reactor.update(s) + + def set_ssl_domain(self, ssl_domain): + self._ssl_domain = ssl_domain + + def close(self): + if not self._selectable.is_terminal: + IO.close(self._selectable) + self._selectable.terminate() + self._reactor.update(self._selectable) + + def on_selectable_readable(self, event): + s = event.selectable + + sock, name = IO.accept(self._selectable) + _logger.debug("Accepted connection from %s", name) + + r = self._reactor + handler = self._handler or r.handler + c = r.connection(handler) + c._acceptor = self + c.url = Url(host=name[0], port=name[1]) + t = Transport(Transport.SERVER) + if self._ssl_domain: + t.ssl(self._ssl_domain) + t.bind(c) + + s = r.selectable(delegate=sock) + s._transport = t + t._selectable = s + IOHandler.update(t, s, r.now) + class Connector(Handler): """ Internal handler that triggers the necessary socket connect for an @@ -608,14 +728,13 @@ class Connector(Handler): self.ssl_sni = None self.max_frame_size = None - def _connect(self, connection, reactor): - assert (reactor is not None) + def _connect(self, connection): url = self.address.next() - reactor.set_connection_host(connection, url.host, str(url.port)) + connection.url = url # if virtual-host not set, use host from address as default if self.virtual_host is None: connection.hostname = url.host - Logger.debug("connecting to %r..." % url) + _logger.debug("connecting to %r..." % url) transport = Transport() if self.sasl_enabled: @@ -643,10 +762,10 @@ class Connector(Handler): transport.max_frame_size = self.max_frame_size def on_connection_local_open(self, event): - self._connect(event.connection, event.reactor) + self._connect(event.connection) def on_connection_remote_open(self, event): - Logger.debug("connected to %s" % event.connection.hostname) + _logger.debug("connected to %s" % event.connection.hostname) if self.reconnect: self.reconnect.reset() self.transport = None @@ -661,20 +780,20 @@ class Connector(Handler): event.transport.unbind() delay = self.reconnect.next() if delay == 0: - Logger.info("Disconnected, reconnecting...") - self._connect(self.connection, event.reactor) + _logger.info("Disconnected, reconnecting...") + self._connect(self.connection) return else: - Logger.info("Disconnected will try to reconnect after %s seconds" % delay) + _logger.info("Disconnected will try to reconnect after %s seconds" % delay) event.reactor.schedule(delay, self) return else: - Logger.debug("Disconnected") + _logger.debug("Disconnected") # See connector.cpp: conn.free()/pn_connection_release() here? self.connection = None def on_timer_task(self, event): - self._connect(self.connection, event.reactor) + self._connect(self.connection) class Backoff(object): @@ -727,7 +846,7 @@ class SSLConfig(object): self.client.set_trusted_ca_db(certificate_db) self.server.set_trusted_ca_db(certificate_db) -def find_config_file(): +def _find_config_file(): confname = 'connect.json' confpath = ['.', '~/.config/messaging','/etc/messaging'] for d in confpath: @@ -736,15 +855,15 @@ def find_config_file(): return f return None -def get_default_config(): - conf = os.environ.get('MESSAGING_CONNECT_FILE') or find_config_file() +def _get_default_config(): + conf = os.environ.get('MESSAGING_CONNECT_FILE') or _find_config_file() if conf and os.path.isfile(conf): with open(conf, 'r') as f: return json.load(f) else: return {} -def get_default_port_for_scheme(scheme): +def _get_default_port_for_scheme(scheme): if scheme == 'amqps': return 5671 else: @@ -773,7 +892,6 @@ class Container(Reactor): self.sasl_enabled = True self.user = None self.password = None - Wrapper.__setattr__(self, 'subclass', self.__class__) def connect(self, url=None, urls=None, address=None, handler=None, reconnect=None, heartbeat=None, ssl_domain=None, **kwargs): @@ -825,9 +943,9 @@ class Container(Reactor): """ if not url and not urls and not address: - config = get_default_config() + config = _get_default_config() scheme = config.get('scheme', 'amqp') - _url = "%s://%s:%s" % (scheme, config.get('host', 'localhost'), config.get('port', get_default_port_for_scheme(scheme))) + _url = "%s://%s:%s" % (scheme, config.get('host', 'localhost'), config.get('port', _get_default_port_for_scheme(scheme))) _ssl_domain = None _kwargs = kwargs if config.get('user'): @@ -952,7 +1070,7 @@ class Container(Reactor): snd.source.address = source if target: snd.target.address = target - if handler != None: + if handler is not None: snd.handler = handler if tags: snd.tag_generator = tags @@ -995,7 +1113,7 @@ class Container(Reactor): rcv.source.dynamic = True if target: rcv.target.address = target - if handler != None: + if handler is not None: rcv.handler = handler _apply_link_options(options, rcv) rcv.open() diff --git a/python/proton/_reactor_impl.py b/python/proton/_reactor_impl.py deleted file mode 100644 index 4ffebcd..0000000 --- a/python/proton/_reactor_impl.py +++ /dev/null @@ -1,217 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -from __future__ import absolute_import - -import weakref - -from cproton import PN_INVALID_SOCKET, \ - pn_incref, pn_decref, \ - pn_handler_add, pn_handler_clear, pn_pyhandler, \ - pn_selectable_is_reading, pn_selectable_attachments, pn_selectable_set_reading, \ - pn_selectable_expired, pn_selectable_set_fd, pn_selectable_set_registered, pn_selectable_writable, \ - pn_selectable_is_writing, pn_selectable_set_deadline, pn_selectable_is_registered, pn_selectable_terminate, \ - pn_selectable_get_deadline, pn_selectable_is_terminal, pn_selectable_readable, \ - pn_selectable_release, pn_selectable_set_writing, pn_selectable_get_fd - -from ._common import millis2secs, secs2millis -from ._wrapper import Wrapper - -from . import _compat - -_DEFAULT = False - - -class Selectable(Wrapper): - - @staticmethod - def wrap(impl): - if impl is None: - return None - else: - return Selectable(impl) - - def __init__(self, impl): - Wrapper.__init__(self, impl, pn_selectable_attachments) - - def _init(self): - pass - - def fileno(self, fd=_DEFAULT): - if fd is _DEFAULT: - return pn_selectable_get_fd(self._impl) - elif fd is None: - pn_selectable_set_fd(self._impl, PN_INVALID_SOCKET) - else: - pn_selectable_set_fd(self._impl, fd) - - def _is_reading(self): - return pn_selectable_is_reading(self._impl) - - def _set_reading(self, val): - pn_selectable_set_reading(self._impl, bool(val)) - - reading = property(_is_reading, _set_reading) - - def _is_writing(self): - return pn_selectable_is_writing(self._impl) - - def _set_writing(self, val): - pn_selectable_set_writing(self._impl, bool(val)) - - writing = property(_is_writing, _set_writing) - - def _get_deadline(self): - tstamp = pn_selectable_get_deadline(self._impl) - if tstamp: - return millis2secs(tstamp) - else: - return None - - def _set_deadline(self, deadline): - pn_selectable_set_deadline(self._impl, secs2millis(deadline)) - - deadline = property(_get_deadline, _set_deadline) - - def readable(self): - pn_selectable_readable(self._impl) - - def writable(self): - pn_selectable_writable(self._impl) - - def expired(self): - pn_selectable_expired(self._impl) - - def _is_registered(self): - return pn_selectable_is_registered(self._impl) - - def _set_registered(self, registered): - pn_selectable_set_registered(self._impl, registered) - - registered = property(_is_registered, _set_registered, - doc=""" -The registered property may be get/set by an I/O polling system to -indicate whether the fd has been registered or not. -""") - - @property - def is_terminal(self): - return pn_selectable_is_terminal(self._impl) - - def terminate(self): - pn_selectable_terminate(self._impl) - - def release(self): - pn_selectable_release(self._impl) - - -class _cadapter: - - def __init__(self, handler, on_error=None): - self.handler = handler - self.on_error = on_error - - def dispatch(self, cevent, ctype): - from ._events import Event - ev = Event.wrap(cevent, ctype) - ev.dispatch(self.handler) - - def exception(self, exc, val, tb): - if self.on_error is None: - _compat.raise_(exc, val, tb) - else: - self.on_error((exc, val, tb)) - - -class WrappedHandlersChildSurrogate: - def __init__(self, delegate): - self.handlers = [] - self.delegate = weakref.ref(delegate) - - def on_unhandled(self, method, event): - from ._events import _dispatch - delegate = self.delegate() - if delegate: - _dispatch(delegate, method, event) - - -class WrappedHandlersProperty(object): - def __get__(self, obj, clazz): - if obj is None: - return None - return self.surrogate(obj).handlers - - def __set__(self, obj, value): - self.surrogate(obj).handlers = value - - def surrogate(self, obj): - key = "_surrogate" - objdict = obj.__dict__ - surrogate = objdict.get(key, None) - if surrogate is None: - objdict[key] = surrogate = WrappedHandlersChildSurrogate(obj) - obj.add(surrogate) - return surrogate - - -class WrappedHandler(Wrapper): - handlers = WrappedHandlersProperty() - - @classmethod - def wrap(cls, impl, on_error=None): - if impl is None: - return None - else: - handler = cls(impl) - handler.__dict__["on_error"] = on_error - return handler - - def __init__(self, impl_or_constructor): - Wrapper.__init__(self, impl_or_constructor) - if list(self.__class__.__mro__).index(WrappedHandler) > 1: - # instantiate the surrogate - self.handlers.extend([]) - - def _on_error(self, info): - on_error = getattr(self, "on_error", None) - if on_error is None: - _compat.raise_(info[0], info[1], info[2]) - else: - on_error(info) - - def add(self, handler, on_error=None): - if handler is None: return - if on_error is None: on_error = self._on_error - impl = _chandler(handler, on_error) - pn_handler_add(self._impl, impl) - pn_decref(impl) - - def clear(self): - pn_handler_clear(self._impl) - - -def _chandler(obj, on_error=None): - if obj is None: - return None - elif isinstance(obj, WrappedHandler): - impl = obj._impl - pn_incref(impl) - return impl - else: - return pn_pyhandler(_cadapter(obj, on_error)) diff --git a/python/proton/_selectable.py b/python/proton/_selectable.py new file mode 100644 index 0000000..2125f7d --- /dev/null +++ b/python/proton/_selectable.py @@ -0,0 +1,93 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from __future__ import absolute_import + + +from ._events import Event + +class Selectable(object): + + def __init__(self, delegate, reactor): + self._delegate = delegate + self.reading = False + self.writing = False + self._deadline = 0 + self._terminal = False + self._terminated = False + self._collector = None + self._reactor = reactor + + def release(self): + if self._delegate: + self._delegate.close() + + def __getattr__(self, name): + if self._delegate: + return getattr(self._delegate, name) + else: + return None + + def _get_deadline(self): + tstamp = self._deadline + if tstamp: + return tstamp + else: + return None + + def _set_deadline(self, deadline): + if not deadline: + self._deadline = 0 + else: + self._deadline = deadline + + deadline = property(_get_deadline, _set_deadline) + + def collect(self, collector): + self._collector = collector + + def push_event(self, context, type): + self._collector.put(context, type) + + def update(self): + if not self._terminated: + if self._terminal: + self._terminated = True + self.push_event(self, Event.SELECTABLE_FINAL) + else: + self.push_event(self, Event.SELECTABLE_UPDATED) + + def readable(self): + if self._collector: + self.push_event(self, Event.SELECTABLE_READABLE) + + def writable(self): + if self._collector: + self.push_event(self, Event.SELECTABLE_WRITABLE) + + def expired(self): + if self._collector: + self.push_event(self, Event.SELECTABLE_EXPIRED) + + @property + def is_terminal(self): + return self._terminal + + def terminate(self): + self._terminal = True diff --git a/python/proton/_transport.py b/python/proton/_transport.py index 3db0078..182fde6 100644 --- a/python/proton/_transport.py +++ b/python/proton/_transport.py @@ -88,6 +88,7 @@ class Transport(Wrapper): def _init(self): self._sasl = None self._ssl = None + self._reactor = None def _check(self, err): if err < 0: @@ -136,6 +137,10 @@ A callback for trace logging. The callback is passed the transport and log messa """Assign a connection to the transport""" self._check(pn_transport_bind(self._impl, connection._impl)) + def bind_nothrow(self, connection): + """Assign a connection to the transport""" + pn_transport_bind(self._impl, connection._impl) + def unbind(self): """Release the connection""" self._check(pn_transport_unbind(self._impl)) diff --git a/python/proton/_utils.py b/python/proton/_utils.py index 6462b55..38639bb 100644 --- a/python/proton/_utils.py +++ b/python/proton/_utils.py @@ -23,8 +23,6 @@ import collections import time import threading -from cproton import pn_reactor_collector, pn_collector_release - from ._exceptions import ProtonException, ConnectionException, LinkException, Timeout from ._delivery import Delivery from ._endpoints import Endpoint, Link @@ -284,7 +282,7 @@ class BlockingConnection(Handler): self.run() self.conn = None self.container.global_handler = None # break circular ref: container to cadapter.on_error - pn_collector_release(pn_reactor_collector(self.container._impl)) # straggling event may keep reactor alive + self.container.stop_events() self.container = None def _is_closed(self): diff --git a/python/proton/_wrapper.py b/python/proton/_wrapper.py index 4ee98e8..1e7a33a 100644 --- a/python/proton/_wrapper.py +++ b/python/proton/_wrapper.py @@ -43,6 +43,21 @@ EMPTY_ATTRS = EmptyAttrs() class Wrapper(object): + """ Wrapper for python objects that need to be stored in event contexts and be retrived again from them + Quick note on how this works: + The actual *python* object has only 3 attributes which redirect into the wrapped C objects: + _impl The wrapped C object itself + _attrs This is a special pn_record_t holding a PYCTX which is a python dict + every attribute in the python object is actually looked up here + _record This is the C record itself (so actually identical to _attrs really but + a different python type + + Because the objects actual attributes are stored away they must be initialised *after* the wrapping + is set up. This is the purpose of the _init method in the wrapped object. Wrapper.__init__ will call + eht subclass _init to initialise attributes. So they *must not* be initialised in the subclass __init__ + before calling the superclass (Wrapper) __init__ or they will not be accessible from the wrapper at all. + + """ def __init__(self, impl_or_constructor, get_context=None): init = False diff --git a/python/tests/proton_tests/handler.py b/python/tests/proton_tests/handler.py index 89376ad..2324073 100644 --- a/python/tests/proton_tests/handler.py +++ b/python/tests/proton_tests/handler.py @@ -98,7 +98,7 @@ class HandlerTest(common.Test): reactor.handler.handlers.append(root) def event_root(self, event): - return event.root + return event.handler def event_reactor_handler(self, event): return event.reactor.handler diff --git a/python/tests/proton_tests/reactor.py b/python/tests/proton_tests/reactor.py index 907f1fc..923af2d 100644 --- a/python/tests/proton_tests/reactor.py +++ b/python/tests/proton_tests/reactor.py @@ -374,8 +374,8 @@ class ContainerTest(Test): def on_connection_opened(self, event): event.connection.close() - assert event.container == event.reactor - assert event.container == container + assert event.container is event.reactor + assert event.container is container container.connect(test_handler.url, handler=ConnectionHandler()) container.run() @@ -418,7 +418,7 @@ class ContainerTest(Test): self.listener = event.container.listen("%s:%s" % (self.host, self.port)) def on_connection_opened(self, event): - self.client_addr = event.reactor.get_connection_address(event.connection) + self.client_addr = event.connected_address self.peer_hostname = event.connection.remote_hostname def on_connection_closing(self, event): @@ -431,7 +431,7 @@ class ContainerTest(Test): self.server_addr = None def on_connection_opened(self, event): - self.server_addr = event.reactor.get_connection_address(event.connection) + self.server_addr = event.connected_address event.connection.close() def test_numeric_hostname(self): --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
