This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit b053795579f4269fdbd956efe5e06a2a3445e671
Author: Andrew Stitcher <[email protected]>
AuthorDate: Tue Dec 9 16:15:03 2025 -0500

    PROTON-2890: Elaborate python transaction send example...
    
    ... to more correctly handle transactioned disposition updates to the
    messages it sends. Including handling erroneous disposition updates from
    buggy brokers.
---
 python/examples/tx_send.py | 47 ++++++++++++++++++++++++++++++++++------------
 1 file changed, 35 insertions(+), 12 deletions(-)

diff --git a/python/examples/tx_send.py b/python/examples/tx_send.py
index 8ba663819..a42f192a6 100755
--- a/python/examples/tx_send.py
+++ b/python/examples/tx_send.py
@@ -19,7 +19,7 @@
 #
 
 import optparse
-from proton import Message, Url
+from proton import Disposition, Message, Url
 from proton.reactor import Container
 from proton.handlers import MessagingHandler, TransactionHandler
 
@@ -28,9 +28,10 @@ class TxSend(MessagingHandler, TransactionHandler):
     def __init__(self, url, messages, batch_size):
         super(TxSend, self).__init__()
         self.url = Url(url)
+        self.msg_id = 0
+        self.accepted = 0
         self.current_batch = 0
         self.committed = 0
-        self.confirmed = 0
         self.total = messages
         self.batch_size = batch_size
 
@@ -49,28 +50,50 @@ class TxSend(MessagingHandler, TransactionHandler):
         self.send()
 
     def send(self):
-        while self.transaction and self.sender.credit and (self.committed + 
self.current_batch) < self.total:
-            seq = self.committed + self.current_batch + 1
-            msg = Message(id=seq, body={'sequence': seq})
+        while self.transaction and self.sender.credit and self.current_batch < 
self.batch_size:
+            self.msg_id += 1
+            msg = Message(id=self.msg_id, body={'sequence': self.msg_id})
             self.transaction.send(self.sender, msg)
             self.current_batch += 1
-            if self.current_batch == self.batch_size:
-                self.transaction.commit()
-                self.transaction = None
 
-    def on_accepted(self, event):
-        if event.sender == self.sender:
-            self.confirmed += 1
+    def on_delivery_updated(self, event):
+        """Transactional deliveries are updated via transactional state 
updates"""
+        delivery = event.delivery
+        disposition = delivery.remote
+        # Is this a transactioned delivery update?
+        if disposition.type == Disposition.TRANSACTIONAL_STATE:
+            tid = disposition.id
+            outcome = disposition.outcome_type
+            if outcome == Disposition.ACCEPTED:
+                self.accepted += 1
+                if self.accepted == self.batch_size:
+                    self.transaction.commit()
+                    self.transaction = None
+            else:
+                print(f"delivery {delivery.tag} not accepted - rollback 
transaction {tid}")
+                if self.transaction and self.transaction.id == tid:
+                    self.transaction.abort()
+                    self.transaction = None
+        elif event.sender == self.sender:
+            print(f"delivery {delivery.tag} - Unexpected non-transactional 
update - aborting transaction")
+            if self.transaction:
+                self.transaction.abort()
+                self.transaction = None
 
     def on_transaction_committed(self, event):
         self.committed += self.current_batch
         if self.committed == self.total:
-            print("all messages committed")
+            print(f"{self.committed} messages committed")
             event.connection.close()
         else:
+            self.accepted = 0
             self.current_batch = 0
             self.container.declare_transaction(self.conn, handler=self)
 
+    def on_transaction_aborted(self, event):
+        print(f"{self.committed} committed, transaction aborted - closing")
+        event.connection.close()
+
     def on_disconnected(self, event):
         self.current_batch = 0
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to