This is an automated email from the ASF dual-hosted git repository. astitcher pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit b053795579f4269fdbd956efe5e06a2a3445e671 Author: Andrew Stitcher <[email protected]> AuthorDate: Tue Dec 9 16:15:03 2025 -0500 PROTON-2890: Elaborate python transaction send example... ... to more correctly handle transactioned disposition updates to the messages it sends. Including handling erroneous disposition updates from buggy brokers. --- python/examples/tx_send.py | 47 ++++++++++++++++++++++++++++++++++------------ 1 file changed, 35 insertions(+), 12 deletions(-) diff --git a/python/examples/tx_send.py b/python/examples/tx_send.py index 8ba663819..a42f192a6 100755 --- a/python/examples/tx_send.py +++ b/python/examples/tx_send.py @@ -19,7 +19,7 @@ # import optparse -from proton import Message, Url +from proton import Disposition, Message, Url from proton.reactor import Container from proton.handlers import MessagingHandler, TransactionHandler @@ -28,9 +28,10 @@ class TxSend(MessagingHandler, TransactionHandler): def __init__(self, url, messages, batch_size): super(TxSend, self).__init__() self.url = Url(url) + self.msg_id = 0 + self.accepted = 0 self.current_batch = 0 self.committed = 0 - self.confirmed = 0 self.total = messages self.batch_size = batch_size @@ -49,28 +50,50 @@ class TxSend(MessagingHandler, TransactionHandler): self.send() def send(self): - while self.transaction and self.sender.credit and (self.committed + self.current_batch) < self.total: - seq = self.committed + self.current_batch + 1 - msg = Message(id=seq, body={'sequence': seq}) + while self.transaction and self.sender.credit and self.current_batch < self.batch_size: + self.msg_id += 1 + msg = Message(id=self.msg_id, body={'sequence': self.msg_id}) self.transaction.send(self.sender, msg) self.current_batch += 1 - if self.current_batch == self.batch_size: - self.transaction.commit() - self.transaction = None - def on_accepted(self, event): - if event.sender == self.sender: - self.confirmed += 1 + def on_delivery_updated(self, event): + """Transactional deliveries are updated via transactional state updates""" + delivery = event.delivery + disposition = delivery.remote + # Is this a transactioned delivery update? + if disposition.type == Disposition.TRANSACTIONAL_STATE: + tid = disposition.id + outcome = disposition.outcome_type + if outcome == Disposition.ACCEPTED: + self.accepted += 1 + if self.accepted == self.batch_size: + self.transaction.commit() + self.transaction = None + else: + print(f"delivery {delivery.tag} not accepted - rollback transaction {tid}") + if self.transaction and self.transaction.id == tid: + self.transaction.abort() + self.transaction = None + elif event.sender == self.sender: + print(f"delivery {delivery.tag} - Unexpected non-transactional update - aborting transaction") + if self.transaction: + self.transaction.abort() + self.transaction = None def on_transaction_committed(self, event): self.committed += self.current_batch if self.committed == self.total: - print("all messages committed") + print(f"{self.committed} messages committed") event.connection.close() else: + self.accepted = 0 self.current_batch = 0 self.container.declare_transaction(self.conn, handler=self) + def on_transaction_aborted(self, event): + print(f"{self.committed} committed, transaction aborted - closing") + event.connection.close() + def on_disconnected(self, event): self.current_batch = 0 --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
