This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit 746b8b7722e2f7320cbf3089a741b1bc5485e235
Author: Andrew Stitcher <[email protected]>
AuthorDate: Wed Dec 17 22:53:28 2025 -0500

    PROTON-2890: Restructure broker to allow transaction timeout abort
    
    Also rollback shouldn't requeue transactionally updated outgoing
    messages.
---
 python/examples/broker.py | 144 +++++++++++++++++++++++++++++-----------------
 1 file changed, 92 insertions(+), 52 deletions(-)

diff --git a/python/examples/broker.py b/python/examples/broker.py
index 5876a6181..05354bd94 100755
--- a/python/examples/broker.py
+++ b/python/examples/broker.py
@@ -18,10 +18,13 @@
 # under the License.
 #
 
+from __future__ import annotations
+
 from abc import ABC, abstractmethod
 from collections import deque, namedtuple
 from dataclasses import dataclass
 import optparse
+import time
 import uuid
 
 from typing import Optional, Union
@@ -34,8 +37,8 @@ from proton.reactor import Container
 
 
 class Queue:
-    def __init__(self, broker: 'Broker', name: str, dynamic: bool = False):
-        self.broker = broker
+    def __init__(self, broker: Broker, name: str, dynamic: bool = False):
+        self.broker: Broker = broker
         self.name: str = name
         self.dynamic: bool = dynamic
         self.queue: deque[Message] = deque()
@@ -81,10 +84,10 @@ UnsettledDelivery = namedtuple('UnsettledDelivery', 
['address', 'message'])
 
 class TransactionAction(ABC):
     @abstractmethod
-    def commit(self, broker): ...
+    def commit(self, broker: Broker) -> None: ...
 
     @abstractmethod
-    def rollback(self, broker): ...
+    def rollback(self, broker: Broker) -> None: ...
 
 
 @dataclass
@@ -93,11 +96,11 @@ class QueueMessage(TransactionAction):
     message: Message
     address: str
 
-    def commit(self, broker):
+    def commit(self, broker: Broker) -> None:
         broker.publish(self.message, self.address)
         self.delivery.settle()
 
-    def rollback(self, broker):
+    def rollback(self, broker: Broker) -> None:
         pass
 
 
@@ -105,34 +108,61 @@ class QueueMessage(TransactionAction):
 class RejectDelivery(TransactionAction):
     delivery: Delivery
 
-    def commit(self, broker):
+    def commit(self, broker: Broker) -> None:
         self.delivery.settle()
 
-    def rollback(self, broker):
+    def rollback(self, broker: Broker) -> None:
         pass
 
 
 @dataclass
 class DeliveryUpdate(TransactionAction):
     delivery: Delivery
-    outcome: DispositionType
+    commit_outcome: DispositionType
+
+    def commit(self, broker: Broker) -> None:
+        broker.delivery_outcome(self.delivery, self.commit_outcome)
+
+    def rollback(self, broker: Broker) -> None:
+        pass
 
-    def commit(self, broker):
-        broker.delivery_update(self.delivery, self.outcome)
 
-    def rollback(self, broker):
-        broker.delivery_update(self.delivery, Disposition.RELEASED)
+class Transaction:
+    """
+    Simple container for broker-managed transactions.
+    """
+    def __init__(
+        self,
+        tid: bytes,
+        start_time: float,
+    ):
+        self.tid = tid
+        self.start_time = start_time
+        self.actions: list[TransactionAction] = []
+
+    def add_action(self, action: TransactionAction) -> None:
+        self.actions.append(action)
+
+    def commit(self, broker: Broker) -> Optional[Condition]:
+        for action in self.actions:
+            action.commit(broker)
+        return None
+
+    def rollback(self, broker: Broker) -> None:
+        for action in self.actions:
+            action.rollback(broker)
 
 
 class Broker(MessagingHandler):
-    def __init__(self, url, verbose, redelivery_limit):
+    def __init__(self, url: str, verbose: bool, redelivery_limit: int, 
txn_timeout: float) -> None:
         super().__init__(auto_accept=False)
         self.verbose = verbose
         self.url = url
         self.redelivery_limit = redelivery_limit
+        self.txn_timeout = txn_timeout
         self.queues: dict[str, Queue] = {}
         self.unsettled_deliveries: dict[Delivery, UnsettledDelivery] = {}
-        self.txns: dict[bytes, list[TransactionAction]] = {}
+        self.txns: dict[bytes, Transaction] = {}
         self.acceptor = None
 
     def _verbose_print(self, message):
@@ -166,7 +196,7 @@ class Broker(MessagingHandler):
             # requested = link.remote_target.capabilities.get_object()
             link.target.type = Terminus.COORDINATOR
             link.target.copy(link.remote_target)
-            link._txns = set()
+            link._txns: set[Transaction] = set()
         elif link.remote_target.address:
             link.target.address = link.remote_target.address
 
@@ -199,7 +229,7 @@ class Broker(MessagingHandler):
         self._verbose_print(f"{delivery.tag=}: Modified: {message.id=} 
Requeued: {address=}")
         self._queue(address).publish(message)
 
-    def delivery_update(self, delivery: Delivery, outcome: Union[int, 
DispositionType]):
+    def delivery_outcome(self, delivery: Delivery, outcome: Union[int, 
DispositionType]):
         unsettled_delivery = self.unsettled_deliveries[delivery]
         message = unsettled_delivery.message
         address = unsettled_delivery.address
@@ -218,23 +248,31 @@ class Broker(MessagingHandler):
         delivery.settle()
         del unsettled_delivery
 
-    def _declare_txn(self):
+    def _declare_txn(self) -> Transaction:
         tid = bytes(uuid.uuid4().bytes)
-        self.txns[tid] = []
-        return tid
+        txn = Transaction(tid, time.monotonic())
+        self.txns[tid] = txn
+        return txn
 
-    def _discharge_txn(self, tid, failed):
+    def _discharge_txn(self, txn: Transaction, failed) -> Optional[Condition]:
+        error = None
+        tid = txn.tid
         if not failed:
             # Commit
-            self._verbose_print(f"{tid=}: Commit")
-            for action in self.txns[tid]:
-                action.commit(self)
+            txn_time = time.monotonic() - txn.start_time
+            if self.txn_timeout > 0 and txn_time > self.txn_timeout:
+                error = Condition('amqp:transaction:timeout', f"timeout: 
{self.txn_timeout}s time: {txn_time}s")
+            else:
+                error = txn.commit(self)
+            if error is not None:
+                self._verbose_print(f"{tid=}: Commit failed: {error} -> 
rollback")
+                txn.rollback(self)
         else:
             # Rollback
             self._verbose_print(f"{tid=}: Rollback")
-            for action in self.txns[tid]:
-                action.rollback(self)
+            txn.rollback(self)
         del self.txns[tid]
+        return error
 
     def _coordinator_message(self, msg, delivery):
         body = msg.body
@@ -243,23 +281,26 @@ class Broker(MessagingHandler):
             d = body.descriptor
             if d == "amqp:declare:list":
                 # Allocate transaction id
-                tid = self._declare_txn()
-                self._verbose_print(f"{tid=}: Declare")
-                delivery.local = DeclaredDisposition(tid)
-                link._txns.add(tid)
+                txn = self._declare_txn()
+                self._verbose_print(f"{txn.tid=}: Declare")
+                delivery.local = DeclaredDisposition(txn.tid)
+                link._txns.add(txn)
             elif d == "amqp:discharge:list":
-                # Always accept commit/abort!
+                # Commit/abort the transaction (commit may fail and be 
rejected).
                 value = body.value
                 tid = bytes(value[0])
                 failed = bool(value[1])
-                if tid in link._txns:
-                    self._discharge_txn(tid, failed)
-                    delivery.update(Disposition.ACCEPTED)
-                    link._txns.remove(tid)
+                txn = self.txns.get(tid)
+                if txn is not None:
+                    error = self._discharge_txn(txn, failed)
+                    if error is None:
+                        delivery.update(Disposition.ACCEPTED)
+                    else:
+                        delivery.update(RejectedDisposition(error))
+                    link._txns.remove(txn)
                 else:
                     self._verbose_print(f"{tid=}: Discharge unknown txn-id: 
{failed=}")
-                    delivery.local.condition = 
Condition('amqp:transaction:unknown-id')
-                    delivery.update(Disposition.REJECTED)
+                    
delivery.update(RejectedDisposition(Condition('amqp:transaction:unknown-id')))
         delivery.settle()
 
     def on_link_closing(self, event):
@@ -268,8 +309,8 @@ class Broker(MessagingHandler):
             self._unsubscribe(link)
         elif link.target.type == Terminus.COORDINATOR:
             # Abort any remaining active transactions
-            for tid in link._txns:
-                self._discharge_txn(tid, failed=True)
+            for txn in link._txns:
+                self._discharge_txn(txn, failed=True)
             link._txns.clear()
 
     def _remove_stale_consumers(self, connection):
@@ -281,8 +322,8 @@ class Broker(MessagingHandler):
         for link in connection.links(Endpoint.LOCAL_ACTIVE):
             if link.target.type == Terminus.COORDINATOR:
                 # Abort any remaining active transactions
-                for tid in link._txns:
-                    self._discharge_txn(tid, failed=True)
+                for txn in link._txns:
+                    self._discharge_txn(txn, failed=True)
                 link._txns.clear()
 
     def on_connection_closing(self, event):
@@ -327,16 +368,14 @@ class Broker(MessagingHandler):
             if tid in self.txns:
                 if address:
                     self._verbose_print(f"{tid=}: Message: {message.id=}")
-                    self.txns[tid].append(QueueMessage(delivery, message, 
address))
+                    self.txns[tid].add_action(QueueMessage(delivery, message, 
address))
                 else:
-                    self.txns[tid].append(RejectDelivery(delivery))
-                delivery.local = TransactionalDisposition(tid, ldisposition)
-                delivery.update()
+                    self.txns[tid].add_action(RejectDelivery(delivery))
+                delivery.update(TransactionalDisposition(tid, ldisposition))
                 return
             else:
                 self._verbose_print(f"{tid=}: Message: unknown txn-id")
-                delivery.local.condition = 
Condition('amqp:transaction:unknown-id')
-                delivery.update(Disposition.REJECTED)
+                
delivery.update(RejectedDisposition(Condition('amqp:transaction:unknown-id')))
                 delivery.settle()
                 return
 
@@ -366,14 +405,13 @@ class Broker(MessagingHandler):
             outcome = disposition.outcome_type
             if tid in self.txns:
                 self._verbose_print(f"{tid=}: Delivery update: 
outcome={outcome}")
-                self.txns[tid].append(DeliveryUpdate(delivery, outcome))
+                self.txns[tid].add_action(DeliveryUpdate(delivery, outcome))
                 return
             else:
                 self._verbose_print(f"{tid=}: Delivery update: unknown txn-id")
-                delivery.local.condition = 
Condition('amqp:transaction:unknown-id')
-                delivery.update(Disposition.REJECTED)
+                
delivery.update(RejectedDisposition(Condition('amqp:transaction:unknown-id')))
         else:
-            self.delivery_update(delivery, disposition.type)
+            self.delivery_outcome(delivery, disposition.type)
         # The delivery is settled in every case except a valid transaction
         # where the outcome is not yet known until the transaction is 
discharged.
         delivery.settle()
@@ -387,10 +425,12 @@ def main():
                       help="address router listens on (default %default)")
     parser.add_option("-l", "--redelivery-limit", default=5,
                       help="maximum redelivery attempts (default %default)")
+    parser.add_option("-t", "--txn-timeout", default=0,
+                      help="transaction timeout in seconds (default %default)")
     opts, args = parser.parse_args()
 
     try:
-        Container(Broker(opts.address, opts.verbose, 
opts.redelivery_limit)).run()
+        Container(Broker(opts.address, opts.verbose, 
int(opts.redelivery_limit), float(opts.txn_timeout))).run()
     except KeyboardInterrupt:
         pass
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to