This is an automated email from the ASF dual-hosted git repository. astitcher pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit 912158e646b562ccacd815e38348c5361ac881c6 Author: Andrew Stitcher <[email protected]> AuthorDate: Tue Dec 9 11:12:15 2025 -0500 PROTON-2890: [Python] Broker should send transactioned updates For a messages sent in a transaction any accept/reject etc. disposition updates must be sent in a transactional state disposition update. Section 4.4.1 of the AMQP spec. --- python/examples/broker.py | 46 +++++++++++++++++++++++++++++++--------------- 1 file changed, 31 insertions(+), 15 deletions(-) diff --git a/python/examples/broker.py b/python/examples/broker.py index f439dfe49..cc4408421 100755 --- a/python/examples/broker.py +++ b/python/examples/broker.py @@ -27,7 +27,8 @@ import uuid from typing import Optional, Union from proton import (Condition, Delivery, Described, Disposition, DispositionType, - Endpoint, Link, Sender, Message, Terminus, DeclaredDisposition) + Endpoint, Link, Sender, Message, Terminus, DeclaredDisposition, + RejectedDisposition, TransactionalDisposition) from proton.handlers import MessagingHandler from proton.reactor import Container @@ -94,11 +95,20 @@ class QueueMessage(TransactionAction): def commit(self, broker): broker.publish(self.message, self.address) - self.delivery.update(Disposition.ACCEPTED) self.delivery.settle() def rollback(self, broker): - self.delivery.update(Disposition.RELEASED) + self.delivery.settle() + + +@dataclass +class RejectDelivery(TransactionAction): + delivery: Delivery + + def commit(self, broker): + self.delivery.settle() + + def rollback(self, broker): self.delivery.settle() @@ -303,20 +313,25 @@ class Broker(MessagingHandler): if address is None: address = message.address + ldisposition = None if address is None: - self._verbose_print("{message.id=}: Message without address") - delivery.local.condition = Condition('amqp:link:invalid-address') - delivery.update(Disposition.REJECTED) - delivery.settle() - return + self._verbose_print(f"{message.id=}: Message without address") + ldisposition = RejectedDisposition(Condition('amqp:link:invalid-address')) + else: + ldisposition = Disposition.ACCEPTED # Is this a transactioned message? - disposition = delivery.remote - if disposition and disposition.type == Disposition.TRANSACTIONAL_STATE: - tid = disposition.id + rdisposition = delivery.remote + if rdisposition and rdisposition.type == Disposition.TRANSACTIONAL_STATE: + tid = rdisposition.id if tid in self.txns: - self._verbose_print(f"{tid=}: Message: {message.id=}") - self.txns[tid].append(QueueMessage(delivery, message, address)) + if address: + self._verbose_print(f"{tid=}: Message: {message.id=}") + self.txns[tid].append(QueueMessage(delivery, message, address)) + else: + self.txns[tid].append(RejectDelivery(delivery)) + delivery.local = TransactionalDisposition(tid, ldisposition) + delivery.update() return else: self._verbose_print(f"{tid=}: Message: unknown txn-id") @@ -325,8 +340,9 @@ class Broker(MessagingHandler): delivery.settle() return - self.publish(message, address) - delivery.update(Disposition.ACCEPTED) + if address: + self.publish(message, address) + delivery.update(ldisposition) delivery.settle() def publish(self, message, address): --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
