This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit 912158e646b562ccacd815e38348c5361ac881c6
Author: Andrew Stitcher <[email protected]>
AuthorDate: Tue Dec 9 11:12:15 2025 -0500

    PROTON-2890: [Python] Broker should send transactioned updates
    
    For a messages sent in a transaction any accept/reject etc. disposition
    updates must be sent in a transactional state disposition update.
    Section 4.4.1 of the AMQP spec.
---
 python/examples/broker.py | 46 +++++++++++++++++++++++++++++++---------------
 1 file changed, 31 insertions(+), 15 deletions(-)

diff --git a/python/examples/broker.py b/python/examples/broker.py
index f439dfe49..cc4408421 100755
--- a/python/examples/broker.py
+++ b/python/examples/broker.py
@@ -27,7 +27,8 @@ import uuid
 from typing import Optional, Union
 
 from proton import (Condition, Delivery, Described, Disposition, 
DispositionType,
-                    Endpoint, Link, Sender, Message, Terminus, 
DeclaredDisposition)
+                    Endpoint, Link, Sender, Message, Terminus, 
DeclaredDisposition,
+                    RejectedDisposition, TransactionalDisposition)
 from proton.handlers import MessagingHandler
 from proton.reactor import Container
 
@@ -94,11 +95,20 @@ class QueueMessage(TransactionAction):
 
     def commit(self, broker):
         broker.publish(self.message, self.address)
-        self.delivery.update(Disposition.ACCEPTED)
         self.delivery.settle()
 
     def rollback(self, broker):
-        self.delivery.update(Disposition.RELEASED)
+        self.delivery.settle()
+
+
+@dataclass
+class RejectDelivery(TransactionAction):
+    delivery: Delivery
+
+    def commit(self, broker):
+        self.delivery.settle()
+
+    def rollback(self, broker):
         self.delivery.settle()
 
 
@@ -303,20 +313,25 @@ class Broker(MessagingHandler):
         if address is None:
             address = message.address
 
+        ldisposition = None
         if address is None:
-            self._verbose_print("{message.id=}: Message without address")
-            delivery.local.condition = Condition('amqp:link:invalid-address')
-            delivery.update(Disposition.REJECTED)
-            delivery.settle()
-            return
+            self._verbose_print(f"{message.id=}: Message without address")
+            ldisposition = 
RejectedDisposition(Condition('amqp:link:invalid-address'))
+        else:
+            ldisposition = Disposition.ACCEPTED
 
         # Is this a transactioned message?
-        disposition = delivery.remote
-        if disposition and disposition.type == Disposition.TRANSACTIONAL_STATE:
-            tid = disposition.id
+        rdisposition = delivery.remote
+        if rdisposition and rdisposition.type == 
Disposition.TRANSACTIONAL_STATE:
+            tid = rdisposition.id
             if tid in self.txns:
-                self._verbose_print(f"{tid=}: Message: {message.id=}")
-                self.txns[tid].append(QueueMessage(delivery, message, address))
+                if address:
+                    self._verbose_print(f"{tid=}: Message: {message.id=}")
+                    self.txns[tid].append(QueueMessage(delivery, message, 
address))
+                else:
+                    self.txns[tid].append(RejectDelivery(delivery))
+                delivery.local = TransactionalDisposition(tid, ldisposition)
+                delivery.update()
                 return
             else:
                 self._verbose_print(f"{tid=}: Message: unknown txn-id")
@@ -325,8 +340,9 @@ class Broker(MessagingHandler):
                 delivery.settle()
                 return
 
-        self.publish(message, address)
-        delivery.update(Disposition.ACCEPTED)
+        if address:
+            self.publish(message, address)
+        delivery.update(ldisposition)
         delivery.settle()
 
     def publish(self, message, address):


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to