This is an automated email from the ASF dual-hosted git repository. astitcher pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit 5de2b8e93300b366c1952d942897d1aa92206acb Author: Andrew Stitcher <astitc...@apache.org> AuthorDate: Fri Apr 18 18:37:57 2025 -0400 PROTON-2891: [Python] Improve typing annotations especially for Events --- python/proton/_delivery.py | 3 + python/proton/_endpoints.py | 7 +- python/proton/_events.py | 2 +- python/proton/_handlers.py | 196 ++++++++++++++++++++++++++------------------ 4 files changed, 128 insertions(+), 80 deletions(-) diff --git a/python/proton/_delivery.py b/python/proton/_delivery.py index 90bf3689f..41c39d84a 100644 --- a/python/proton/_delivery.py +++ b/python/proton/_delivery.py @@ -133,6 +133,9 @@ class Disposition: MODIFIED = DispositionType.MODIFIED TRANSACTIONAL_STATE = DispositionType.TRANSACTIONAL_STATE + @property + def type(self) -> Union[int, DispositionType]: ... + class RemoteDisposition(Disposition): diff --git a/python/proton/_endpoints.py b/python/proton/_endpoints.py index 2559a4dd5..21171b16d 100644 --- a/python/proton/_endpoints.py +++ b/python/proton/_endpoints.py @@ -65,7 +65,7 @@ from ._transport import Transport from ._wrapper import Wrapper from collections.abc import Iterator -from typing import Any, Optional, Union, TYPE_CHECKING +from typing import Any, Optional, Union, TYPE_CHECKING, overload if TYPE_CHECKING: from ._condition import Condition @@ -1170,6 +1170,11 @@ class Sender(Link): """ return self._check(pn_link_send(self._impl, data)) + @overload + def send(self, obj: bytes) -> int: ... + @overload + def send(self, obj: 'Message', tag: Optional[str] = None) -> Delivery: ... + def send(self, obj: Union[bytes, 'Message'], tag: Optional[str] = None) -> Union[int, Delivery]: """ A convenience method to send objects as message content. diff --git a/python/proton/_events.py b/python/proton/_events.py index 55c5d7717..e9995b9dc 100644 --- a/python/proton/_events.py +++ b/python/proton/_events.py @@ -556,7 +556,7 @@ class Event(EventBase): return self._session @property - def link(self) -> Optional[Union['Receiver', 'Sender']]: + def link(self) -> Optional[Link]: """ The link associated with the event, or ``None`` if none is associated with it. diff --git a/python/proton/_handlers.py b/python/proton/_handlers.py index 819f8431c..79a6f3df5 100644 --- a/python/proton/_handlers.py +++ b/python/proton/_handlers.py @@ -24,8 +24,8 @@ import time import weakref from ._condition import Condition -from ._delivery import Delivery, ModifiedDisposition -from ._endpoints import Endpoint +from ._delivery import Delivery, DispositionType, ModifiedDisposition +from ._endpoints import Connection, Endpoint, Link, Receiver, Session from ._events import Event, _dispatch from ._exceptions import ProtonException from ._handler import Handler @@ -34,16 +34,56 @@ from ._message import Message from ._selectable import Selectable from ._transport import Transport from ._url import Url -from typing import Any, Optional, Union, TYPE_CHECKING + +from typing import Any, Optional, TYPE_CHECKING if TYPE_CHECKING: - from ._delivery import DispositionType from ._reactor import Container, Transaction - from ._endpoints import Sender, Receiver log = logging.getLogger("proton") +class ConnectionEvent(Event): + @property + def connection(self) -> Connection: ... + + +class SessionEvent(ConnectionEvent): + @property + def session(self) -> Session: ... + + +class LinkEvent(SessionEvent): + @property + def link(self) -> Link: ... + + +class DeliveryEvent(LinkEvent): + @property + def delivery(self) -> Delivery: ... + + +class MessageEvent(DeliveryEvent): + @property + def message(self) -> Message: ... + + @property + def receiver(self) -> Receiver: ... + + +class TransportEvent(Event): + @property + def transport(self) -> Transport: ... + + +class ConnectionBoundEvent(Event): + @property + def connection(self) -> Connection: ... + + @property + def transport(self) -> Transport: ... + + class OutgoingMessageHandler(Handler): """ A utility for simpler and more intuitive handling of delivery @@ -60,13 +100,13 @@ class OutgoingMessageHandler(Handler): self.auto_settle = auto_settle self.delegate = delegate - def on_link_flow(self, event: Event): + def on_link_flow(self, event: LinkEvent): if event.link.is_sender and event.link.credit \ and event.link.state & Endpoint.LOCAL_ACTIVE \ and event.link.state & Endpoint.REMOTE_ACTIVE: self.on_sendable(event) - def on_delivery(self, event: Event): + def on_delivery(self, event: DeliveryEvent): dlv = event.delivery if dlv.link.is_sender and dlv.updated: if dlv.remote_state == Delivery.ACCEPTED: @@ -80,7 +120,7 @@ class OutgoingMessageHandler(Handler): if self.auto_settle: dlv.settle() - def on_sendable(self, event: Event): + def on_sendable(self, event: LinkEvent): """ Called when the sender link has credit and messages can therefore be transferred. @@ -91,7 +131,7 @@ class OutgoingMessageHandler(Handler): if self.delegate is not None: _dispatch(self.delegate, 'on_sendable', event) - def on_accepted(self, event: Event): + def on_accepted(self, event: DeliveryEvent): """ Called when the remote peer accepts an outgoing message. @@ -101,7 +141,7 @@ class OutgoingMessageHandler(Handler): if self.delegate is not None: _dispatch(self.delegate, 'on_accepted', event) - def on_rejected(self, event: Event): + def on_rejected(self, event: DeliveryEvent): """ Called when the remote peer rejects an outgoing message. @@ -111,7 +151,7 @@ class OutgoingMessageHandler(Handler): if self.delegate is not None: _dispatch(self.delegate, 'on_rejected', event) - def on_released(self, event: Event): + def on_released(self, event: DeliveryEvent): """ Called when the remote peer releases an outgoing message. Note that this may be in response to either the ``RELEASE`` or ``MODIFIED`` @@ -123,7 +163,7 @@ class OutgoingMessageHandler(Handler): if self.delegate is not None: _dispatch(self.delegate, 'on_released', event) - def on_settled(self, event: Event): + def on_settled(self, event: DeliveryEvent): """ Called when the remote peer has settled the outgoing message. This is the point at which it should never be @@ -239,7 +279,7 @@ class IncomingMessageHandler(Handler, Acking): self.delegate = delegate self.auto_accept = auto_accept - def on_delivery(self, event: Event) -> None: + def on_delivery(self, event: DeliveryEvent) -> None: dlv = event.delivery if not dlv.link.is_receiver: return @@ -247,13 +287,13 @@ class IncomingMessageHandler(Handler, Acking): self.on_aborted(event) dlv.settle() elif dlv.readable and not dlv.partial: - event.message = recv_msg(dlv) if event.link.state & Endpoint.LOCAL_CLOSED: if self.auto_accept: dlv.update(Delivery.RELEASED) dlv.settle() else: try: + event.message = recv_msg(dlv) self.on_message(event) if self.auto_accept: dlv.update(Delivery.ACCEPTED) @@ -267,7 +307,7 @@ class IncomingMessageHandler(Handler, Acking): elif dlv.updated and dlv.settled: self.on_settled(event) - def on_message(self, event: Event): + def on_message(self, event: MessageEvent): """ Called when a message is received. The message itself can be obtained as a property on the event. For the purpose of @@ -281,7 +321,7 @@ class IncomingMessageHandler(Handler, Acking): if self.delegate is not None: _dispatch(self.delegate, 'on_message', event) - def on_settled(self, event: Event): + def on_settled(self, event: DeliveryEvent): """ Callback for when a message delivery is settled by the remote peer. @@ -291,7 +331,7 @@ class IncomingMessageHandler(Handler, Acking): if self.delegate is not None: _dispatch(self.delegate, 'on_settled', event) - def on_aborted(self, event: Event): + def on_aborted(self, event: DeliveryEvent): """ Callback for when a message delivery is aborted by the remote peer. @@ -398,7 +438,7 @@ class EndpointStateHandler(Handler): elif cls.is_local_open(endpoint) and cls.is_remote_closed(endpoint): log.error("%s closed by peer" % endpoint_type) - def on_link_remote_close(self, event: Event) -> None: + def on_link_remote_close(self, event: LinkEvent) -> None: if event.link.remote_condition: self.on_link_error(event) elif self.is_local_closed(event.link): @@ -407,11 +447,11 @@ class EndpointStateHandler(Handler): self.on_link_closing(event) event.link.close() - def on_link_local_close(self, event: Event) -> None: + def on_link_local_close(self, event: LinkEvent) -> None: if self.is_remote_closed(event.link): self.on_link_closed(event) - def on_session_remote_close(self, event: Event) -> None: + def on_session_remote_close(self, event: SessionEvent) -> None: if event.session.remote_condition: self.on_session_error(event) elif self.is_local_closed(event.session): @@ -420,11 +460,11 @@ class EndpointStateHandler(Handler): self.on_session_closing(event) event.session.close() - def on_session_local_close(self, event: Event) -> None: + def on_session_local_close(self, event: SessionEvent) -> None: if self.is_remote_closed(event.session): self.on_session_closed(event) - def on_connection_remote_close(self, event: Event) -> None: + def on_connection_remote_close(self, event: ConnectionEvent) -> None: if event.connection.remote_condition: if event.connection.remote_condition.name == "amqp:connection:forced": # Treat this the same as just having the transport closed by the peer without @@ -437,44 +477,44 @@ class EndpointStateHandler(Handler): self.on_connection_closing(event) event.connection.close() - def on_connection_local_close(self, event: Event) -> None: + def on_connection_local_close(self, event: ConnectionEvent) -> None: if self.is_remote_closed(event.connection): self.on_connection_closed(event) - def on_connection_local_open(self, event: Event) -> None: + def on_connection_local_open(self, event: ConnectionEvent) -> None: if self.is_remote_open(event.connection): self.on_connection_opened(event) - def on_connection_remote_open(self, event: Event) -> None: + def on_connection_remote_open(self, event: ConnectionEvent) -> None: if self.is_local_open(event.connection): self.on_connection_opened(event) elif self.is_local_uninitialised(event.connection): self.on_connection_opening(event) event.connection.open() - def on_session_local_open(self, event: Event) -> None: + def on_session_local_open(self, event: SessionEvent) -> None: if self.is_remote_open(event.session): self.on_session_opened(event) - def on_session_remote_open(self, event: Event) -> None: + def on_session_remote_open(self, event: SessionEvent) -> None: if self.is_local_open(event.session): self.on_session_opened(event) elif self.is_local_uninitialised(event.session): self.on_session_opening(event) event.session.open() - def on_link_local_open(self, event: Event) -> None: + def on_link_local_open(self, event: LinkEvent) -> None: if self.is_remote_open(event.link): self.on_link_opened(event) - def on_link_remote_open(self, event: Event) -> None: + def on_link_remote_open(self, event: LinkEvent) -> None: if self.is_local_open(event.link): self.on_link_opened(event) elif self.is_local_uninitialised(event.link): self.on_link_opening(event) event.link.open() - def on_connection_opened(self, event: Event) -> None: + def on_connection_opened(self, event: ConnectionEvent) -> None: """ Callback for when both the local and remote endpoints of a connection have opened. @@ -485,7 +525,7 @@ class EndpointStateHandler(Handler): if self.delegate is not None: _dispatch(self.delegate, 'on_connection_opened', event) - def on_session_opened(self, event: Event) -> None: + def on_session_opened(self, event: SessionEvent) -> None: """ Callback for when both the local and remote endpoints of a session have opened. @@ -496,7 +536,7 @@ class EndpointStateHandler(Handler): if self.delegate is not None: _dispatch(self.delegate, 'on_session_opened', event) - def on_link_opened(self, event: Event) -> None: + def on_link_opened(self, event: LinkEvent) -> None: """ Callback for when both the local and remote endpoints of a link have opened. @@ -507,7 +547,7 @@ class EndpointStateHandler(Handler): if self.delegate is not None: _dispatch(self.delegate, 'on_link_opened', event) - def on_connection_opening(self, event: Event) -> None: + def on_connection_opening(self, event: ConnectionEvent) -> None: """ Callback for when a remote peer initiates the opening of a connection. @@ -518,7 +558,7 @@ class EndpointStateHandler(Handler): if self.delegate is not None: _dispatch(self.delegate, 'on_connection_opening', event) - def on_session_opening(self, event: Event) -> None: + def on_session_opening(self, event: SessionEvent) -> None: """ Callback for when a remote peer initiates the opening of a session. @@ -529,7 +569,7 @@ class EndpointStateHandler(Handler): if self.delegate is not None: _dispatch(self.delegate, 'on_session_opening', event) - def on_link_opening(self, event: Event) -> None: + def on_link_opening(self, event: LinkEvent) -> None: """ Callback for when a remote peer initiates the opening of a link. @@ -540,7 +580,7 @@ class EndpointStateHandler(Handler): if self.delegate is not None: _dispatch(self.delegate, 'on_link_opening', event) - def on_connection_error(self, event: Event) -> None: + def on_connection_error(self, event: ConnectionEvent) -> None: """ Callback for when an initiated connection open fails. @@ -552,7 +592,7 @@ class EndpointStateHandler(Handler): else: self.print_error(event.connection, "connection") - def on_session_error(self, event: Event) -> None: + def on_session_error(self, event: SessionEvent) -> None: """ Callback for when an initiated session open fails. @@ -565,7 +605,7 @@ class EndpointStateHandler(Handler): self.print_error(event.session, "session") event.connection.close() - def on_link_error(self, event: Event) -> None: + def on_link_error(self, event: LinkEvent) -> None: """ Callback for when an initiated link open fails. @@ -578,7 +618,7 @@ class EndpointStateHandler(Handler): self.print_error(event.link, "link") event.connection.close() - def on_connection_closed(self, event: Event) -> None: + def on_connection_closed(self, event: ConnectionEvent) -> None: """ Callback for when both the local and remote endpoints of a connection have closed. @@ -589,7 +629,7 @@ class EndpointStateHandler(Handler): if self.delegate is not None: _dispatch(self.delegate, 'on_connection_closed', event) - def on_session_closed(self, event: Event) -> None: + def on_session_closed(self, event: SessionEvent) -> None: """ Callback for when both the local and remote endpoints of a session have closed. @@ -600,7 +640,7 @@ class EndpointStateHandler(Handler): if self.delegate is not None: _dispatch(self.delegate, 'on_session_closed', event) - def on_link_closed(self, event: Event) -> None: + def on_link_closed(self, event: LinkEvent) -> None: """ Callback for when both the local and remote endpoints of a link have closed. @@ -611,7 +651,7 @@ class EndpointStateHandler(Handler): if self.delegate is not None: _dispatch(self.delegate, 'on_link_closed', event) - def on_connection_closing(self, event: Event) -> None: + def on_connection_closing(self, event: ConnectionEvent) -> None: """ Callback for when a remote peer initiates the closing of a connection. @@ -624,7 +664,7 @@ class EndpointStateHandler(Handler): elif self.peer_close_is_error: self.on_connection_error(event) - def on_session_closing(self, event: Event) -> None: + def on_session_closing(self, event: SessionEvent) -> None: """ Callback for when a remote peer initiates the closing of a session. @@ -637,7 +677,7 @@ class EndpointStateHandler(Handler): elif self.peer_close_is_error: self.on_session_error(event) - def on_link_closing(self, event: Event) -> None: + def on_link_closing(self, event: LinkEvent) -> None: """ Callback for when a remote peer initiates the closing of a link. @@ -650,7 +690,7 @@ class EndpointStateHandler(Handler): elif self.peer_close_is_error: self.on_link_error(event) - def on_transport_tail_closed(self, event: Event) -> None: + def on_transport_tail_closed(self, event: TransportEvent) -> None: """ Callback for when the transport tail has closed (ie no further input will be accepted by the transport). @@ -660,7 +700,7 @@ class EndpointStateHandler(Handler): """ self.on_transport_closed(event) - def on_transport_closed(self, event: Event) -> None: + def on_transport_closed(self, event: TransportEvent) -> None: """ Callback for when the transport has closed - ie both the head (input) and tail (output) of the transport pipeline are closed. @@ -704,7 +744,7 @@ class MessagingHandler(Handler, Acking): self.handlers.append(OutgoingMessageHandler(auto_settle, weakref.proxy(self))) self.fatal_conditions = ["amqp:unauthorized-access"] - def on_transport_error(self, event: Event) -> None: + def on_transport_error(self, event: TransportEvent) -> None: """ Called when some error is encountered with the transport over which the AMQP connection is to be established. This includes @@ -725,7 +765,7 @@ class MessagingHandler(Handler, Acking): else: logging.error("Unspecified transport error") - def on_connection_error(self, event: Event) -> None: + def on_connection_error(self, event: ConnectionEvent) -> None: """ Called when the peer closes the connection with an error condition. @@ -735,7 +775,7 @@ class MessagingHandler(Handler, Acking): """ EndpointStateHandler.print_error(event.connection, "connection") - def on_session_error(self, event: Event) -> None: + def on_session_error(self, event: SessionEvent) -> None: """ Called when the peer closes the session with an error condition. @@ -745,7 +785,7 @@ class MessagingHandler(Handler, Acking): EndpointStateHandler.print_error(event.session, "session") event.connection.close() - def on_link_error(self, event: Event) -> None: + def on_link_error(self, event: LinkEvent) -> None: """ Called when the peer closes the link with an error condition. @@ -775,7 +815,7 @@ class MessagingHandler(Handler, Acking): """ pass - def on_connection_closed(self, event: Event) -> None: + def on_connection_closed(self, event: ConnectionEvent) -> None: """ Called when the connection is closed. @@ -784,7 +824,7 @@ class MessagingHandler(Handler, Acking): """ pass - def on_session_closed(self, event: Event) -> None: + def on_session_closed(self, event: SessionEvent) -> None: """ Called when the session is closed. @@ -793,7 +833,7 @@ class MessagingHandler(Handler, Acking): """ pass - def on_link_closed(self, event: Event) -> None: + def on_link_closed(self, event: LinkEvent) -> None: """ Called when the link is closed. @@ -802,7 +842,7 @@ class MessagingHandler(Handler, Acking): """ pass - def on_connection_closing(self, event: Event) -> None: + def on_connection_closing(self, event: ConnectionEvent) -> None: """ Called when the peer initiates the closing of the connection. @@ -811,7 +851,7 @@ class MessagingHandler(Handler, Acking): """ pass - def on_session_closing(self, event: Event) -> None: + def on_session_closing(self, event: SessionEvent) -> None: """ Called when the peer initiates the closing of the session. @@ -820,7 +860,7 @@ class MessagingHandler(Handler, Acking): """ pass - def on_link_closing(self, event: Event) -> None: + def on_link_closing(self, event: LinkEvent) -> None: """ Called when the peer initiates the closing of the link. @@ -829,7 +869,7 @@ class MessagingHandler(Handler, Acking): """ pass - def on_disconnected(self, event: Event) -> None: + def on_disconnected(self, event: TransportEvent) -> None: """ Called when the socket is disconnected. @@ -838,7 +878,7 @@ class MessagingHandler(Handler, Acking): """ pass - def on_sendable(self, event: Event) -> None: + def on_sendable(self, event: LinkEvent) -> None: """ Called when the sender link has credit and messages can therefore be transferred. @@ -848,7 +888,7 @@ class MessagingHandler(Handler, Acking): """ pass - def on_accepted(self, event: Event) -> None: + def on_accepted(self, event: DeliveryEvent) -> None: """ Called when the remote peer accepts an outgoing message. @@ -857,7 +897,7 @@ class MessagingHandler(Handler, Acking): """ pass - def on_rejected(self, event: Event) -> None: + def on_rejected(self, event: DeliveryEvent) -> None: """ Called when the remote peer rejects an outgoing message. @@ -866,7 +906,7 @@ class MessagingHandler(Handler, Acking): """ pass - def on_released(self, event: Event) -> None: + def on_released(self, event: DeliveryEvent) -> None: """ Called when the remote peer releases an outgoing message. Note that this may be in response to either the RELEASE or MODIFIED @@ -877,7 +917,7 @@ class MessagingHandler(Handler, Acking): """ pass - def on_settled(self, event: Event) -> None: + def on_settled(self, event: DeliveryEvent) -> None: """ Called when the remote peer has settled the outgoing message. This is the point at which it should never be @@ -888,7 +928,7 @@ class MessagingHandler(Handler, Acking): """ pass - def on_message(self, event: Event) -> None: + def on_message(self, event: MessageEvent) -> None: """ Called when a message is received. The message itself can be obtained as a property on the event. For the purpose of @@ -1008,19 +1048,19 @@ class FlowController(Handler): self._window = window self._drained = 0 - def on_link_local_open(self, event: Event) -> None: + def on_link_local_open(self, event: LinkEvent) -> None: self._flow(event.link) - def on_link_remote_open(self, event: Event) -> None: + def on_link_remote_open(self, event: LinkEvent) -> None: self._flow(event.link) - def on_link_flow(self, event: Event) -> None: + def on_link_flow(self, event: LinkEvent) -> None: self._flow(event.link) - def on_delivery(self, event: Event) -> None: + def on_delivery(self, event: LinkEvent) -> None: self._flow(event.link) - def _flow(self, link: Union['Sender', 'Receiver']) -> None: + def _flow(self, link: Link) -> None: if link.is_receiver: self._drained += link.drained() if self._drained == 0: @@ -1031,19 +1071,19 @@ class FlowController(Handler): class Handshaker(Handler): @staticmethod - def on_connection_remote_open(event: Event) -> None: + def on_connection_remote_open(event: ConnectionEvent) -> None: conn = event.connection if conn.state & Endpoint.LOCAL_UNINIT: conn.open() @staticmethod - def on_session_remote_open(event: Event) -> None: + def on_session_remote_open(event: SessionEvent) -> None: ssn = event.session if ssn.state & Endpoint.LOCAL_UNINIT: ssn.open() @staticmethod - def on_link_remote_open(event: Event) -> None: + def on_link_remote_open(event: LinkEvent) -> None: link = event.link if link.state & Endpoint.LOCAL_UNINIT: link.source.copy(link.remote_source) @@ -1051,19 +1091,19 @@ class Handshaker(Handler): link.open() @staticmethod - def on_connection_remote_close(event: Event) -> None: + def on_connection_remote_close(event: ConnectionEvent) -> None: conn = event.connection if not conn.state & Endpoint.LOCAL_CLOSED: conn.close() @staticmethod - def on_session_remote_close(event: Event) -> None: + def on_session_remote_close(event: SessionEvent) -> None: ssn = event.session if not ssn.state & Endpoint.LOCAL_CLOSED: ssn.close() @staticmethod - def on_link_remote_close(event: Event) -> None: + def on_link_remote_close(event: LinkEvent) -> None: link = event.link if not link.state & Endpoint.LOCAL_CLOSED: link.close() @@ -1250,7 +1290,7 @@ class IOHandler(Handler): self.update(t, s, r.now) - def on_connection_local_open(self, event: Event) -> None: + def on_connection_local_open(self, event: ConnectionEvent) -> None: c = event.connection if not c.state & Endpoint.REMOTE_UNINIT: return @@ -1261,7 +1301,7 @@ class IOHandler(Handler): # bound the transport and connection! t.bind_nothrow(c) - def on_connection_bound(self, event: Event) -> None: + def on_connection_bound(self, event: ConnectionBoundEvent) -> None: c = event.connection t = event.transport @@ -1322,14 +1362,14 @@ class IOHandler(Handler): selectable.deadline = transport.tick(now) selectable.update() - def on_transport(self, event: Event) -> None: + def on_transport(self, event: TransportEvent) -> None: t = event.transport r = t._reactor s = t._selectable if s and not s.is_terminal: self.update(t, s, r.now) - def on_transport_closed(self, event: Event) -> None: + def on_transport_closed(self, event: TransportEvent) -> None: t = event.transport r = t._reactor s = t._selectable --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org