This is an automated email from the ASF dual-hosted git repository. astitcher pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit 746b8b7722e2f7320cbf3089a741b1bc5485e235 Author: Andrew Stitcher <[email protected]> AuthorDate: Wed Dec 17 22:53:28 2025 -0500 PROTON-2890: Restructure broker to allow transaction timeout abort Also rollback shouldn't requeue transactionally updated outgoing messages. --- python/examples/broker.py | 144 +++++++++++++++++++++++++++++----------------- 1 file changed, 92 insertions(+), 52 deletions(-) diff --git a/python/examples/broker.py b/python/examples/broker.py index 5876a6181..05354bd94 100755 --- a/python/examples/broker.py +++ b/python/examples/broker.py @@ -18,10 +18,13 @@ # under the License. # +from __future__ import annotations + from abc import ABC, abstractmethod from collections import deque, namedtuple from dataclasses import dataclass import optparse +import time import uuid from typing import Optional, Union @@ -34,8 +37,8 @@ from proton.reactor import Container class Queue: - def __init__(self, broker: 'Broker', name: str, dynamic: bool = False): - self.broker = broker + def __init__(self, broker: Broker, name: str, dynamic: bool = False): + self.broker: Broker = broker self.name: str = name self.dynamic: bool = dynamic self.queue: deque[Message] = deque() @@ -81,10 +84,10 @@ UnsettledDelivery = namedtuple('UnsettledDelivery', ['address', 'message']) class TransactionAction(ABC): @abstractmethod - def commit(self, broker): ... + def commit(self, broker: Broker) -> None: ... @abstractmethod - def rollback(self, broker): ... + def rollback(self, broker: Broker) -> None: ... @dataclass @@ -93,11 +96,11 @@ class QueueMessage(TransactionAction): message: Message address: str - def commit(self, broker): + def commit(self, broker: Broker) -> None: broker.publish(self.message, self.address) self.delivery.settle() - def rollback(self, broker): + def rollback(self, broker: Broker) -> None: pass @@ -105,34 +108,61 @@ class QueueMessage(TransactionAction): class RejectDelivery(TransactionAction): delivery: Delivery - def commit(self, broker): + def commit(self, broker: Broker) -> None: self.delivery.settle() - def rollback(self, broker): + def rollback(self, broker: Broker) -> None: pass @dataclass class DeliveryUpdate(TransactionAction): delivery: Delivery - outcome: DispositionType + commit_outcome: DispositionType + + def commit(self, broker: Broker) -> None: + broker.delivery_outcome(self.delivery, self.commit_outcome) + + def rollback(self, broker: Broker) -> None: + pass - def commit(self, broker): - broker.delivery_update(self.delivery, self.outcome) - def rollback(self, broker): - broker.delivery_update(self.delivery, Disposition.RELEASED) +class Transaction: + """ + Simple container for broker-managed transactions. + """ + def __init__( + self, + tid: bytes, + start_time: float, + ): + self.tid = tid + self.start_time = start_time + self.actions: list[TransactionAction] = [] + + def add_action(self, action: TransactionAction) -> None: + self.actions.append(action) + + def commit(self, broker: Broker) -> Optional[Condition]: + for action in self.actions: + action.commit(broker) + return None + + def rollback(self, broker: Broker) -> None: + for action in self.actions: + action.rollback(broker) class Broker(MessagingHandler): - def __init__(self, url, verbose, redelivery_limit): + def __init__(self, url: str, verbose: bool, redelivery_limit: int, txn_timeout: float) -> None: super().__init__(auto_accept=False) self.verbose = verbose self.url = url self.redelivery_limit = redelivery_limit + self.txn_timeout = txn_timeout self.queues: dict[str, Queue] = {} self.unsettled_deliveries: dict[Delivery, UnsettledDelivery] = {} - self.txns: dict[bytes, list[TransactionAction]] = {} + self.txns: dict[bytes, Transaction] = {} self.acceptor = None def _verbose_print(self, message): @@ -166,7 +196,7 @@ class Broker(MessagingHandler): # requested = link.remote_target.capabilities.get_object() link.target.type = Terminus.COORDINATOR link.target.copy(link.remote_target) - link._txns = set() + link._txns: set[Transaction] = set() elif link.remote_target.address: link.target.address = link.remote_target.address @@ -199,7 +229,7 @@ class Broker(MessagingHandler): self._verbose_print(f"{delivery.tag=}: Modified: {message.id=} Requeued: {address=}") self._queue(address).publish(message) - def delivery_update(self, delivery: Delivery, outcome: Union[int, DispositionType]): + def delivery_outcome(self, delivery: Delivery, outcome: Union[int, DispositionType]): unsettled_delivery = self.unsettled_deliveries[delivery] message = unsettled_delivery.message address = unsettled_delivery.address @@ -218,23 +248,31 @@ class Broker(MessagingHandler): delivery.settle() del unsettled_delivery - def _declare_txn(self): + def _declare_txn(self) -> Transaction: tid = bytes(uuid.uuid4().bytes) - self.txns[tid] = [] - return tid + txn = Transaction(tid, time.monotonic()) + self.txns[tid] = txn + return txn - def _discharge_txn(self, tid, failed): + def _discharge_txn(self, txn: Transaction, failed) -> Optional[Condition]: + error = None + tid = txn.tid if not failed: # Commit - self._verbose_print(f"{tid=}: Commit") - for action in self.txns[tid]: - action.commit(self) + txn_time = time.monotonic() - txn.start_time + if self.txn_timeout > 0 and txn_time > self.txn_timeout: + error = Condition('amqp:transaction:timeout', f"timeout: {self.txn_timeout}s time: {txn_time}s") + else: + error = txn.commit(self) + if error is not None: + self._verbose_print(f"{tid=}: Commit failed: {error} -> rollback") + txn.rollback(self) else: # Rollback self._verbose_print(f"{tid=}: Rollback") - for action in self.txns[tid]: - action.rollback(self) + txn.rollback(self) del self.txns[tid] + return error def _coordinator_message(self, msg, delivery): body = msg.body @@ -243,23 +281,26 @@ class Broker(MessagingHandler): d = body.descriptor if d == "amqp:declare:list": # Allocate transaction id - tid = self._declare_txn() - self._verbose_print(f"{tid=}: Declare") - delivery.local = DeclaredDisposition(tid) - link._txns.add(tid) + txn = self._declare_txn() + self._verbose_print(f"{txn.tid=}: Declare") + delivery.local = DeclaredDisposition(txn.tid) + link._txns.add(txn) elif d == "amqp:discharge:list": - # Always accept commit/abort! + # Commit/abort the transaction (commit may fail and be rejected). value = body.value tid = bytes(value[0]) failed = bool(value[1]) - if tid in link._txns: - self._discharge_txn(tid, failed) - delivery.update(Disposition.ACCEPTED) - link._txns.remove(tid) + txn = self.txns.get(tid) + if txn is not None: + error = self._discharge_txn(txn, failed) + if error is None: + delivery.update(Disposition.ACCEPTED) + else: + delivery.update(RejectedDisposition(error)) + link._txns.remove(txn) else: self._verbose_print(f"{tid=}: Discharge unknown txn-id: {failed=}") - delivery.local.condition = Condition('amqp:transaction:unknown-id') - delivery.update(Disposition.REJECTED) + delivery.update(RejectedDisposition(Condition('amqp:transaction:unknown-id'))) delivery.settle() def on_link_closing(self, event): @@ -268,8 +309,8 @@ class Broker(MessagingHandler): self._unsubscribe(link) elif link.target.type == Terminus.COORDINATOR: # Abort any remaining active transactions - for tid in link._txns: - self._discharge_txn(tid, failed=True) + for txn in link._txns: + self._discharge_txn(txn, failed=True) link._txns.clear() def _remove_stale_consumers(self, connection): @@ -281,8 +322,8 @@ class Broker(MessagingHandler): for link in connection.links(Endpoint.LOCAL_ACTIVE): if link.target.type == Terminus.COORDINATOR: # Abort any remaining active transactions - for tid in link._txns: - self._discharge_txn(tid, failed=True) + for txn in link._txns: + self._discharge_txn(txn, failed=True) link._txns.clear() def on_connection_closing(self, event): @@ -327,16 +368,14 @@ class Broker(MessagingHandler): if tid in self.txns: if address: self._verbose_print(f"{tid=}: Message: {message.id=}") - self.txns[tid].append(QueueMessage(delivery, message, address)) + self.txns[tid].add_action(QueueMessage(delivery, message, address)) else: - self.txns[tid].append(RejectDelivery(delivery)) - delivery.local = TransactionalDisposition(tid, ldisposition) - delivery.update() + self.txns[tid].add_action(RejectDelivery(delivery)) + delivery.update(TransactionalDisposition(tid, ldisposition)) return else: self._verbose_print(f"{tid=}: Message: unknown txn-id") - delivery.local.condition = Condition('amqp:transaction:unknown-id') - delivery.update(Disposition.REJECTED) + delivery.update(RejectedDisposition(Condition('amqp:transaction:unknown-id'))) delivery.settle() return @@ -366,14 +405,13 @@ class Broker(MessagingHandler): outcome = disposition.outcome_type if tid in self.txns: self._verbose_print(f"{tid=}: Delivery update: outcome={outcome}") - self.txns[tid].append(DeliveryUpdate(delivery, outcome)) + self.txns[tid].add_action(DeliveryUpdate(delivery, outcome)) return else: self._verbose_print(f"{tid=}: Delivery update: unknown txn-id") - delivery.local.condition = Condition('amqp:transaction:unknown-id') - delivery.update(Disposition.REJECTED) + delivery.update(RejectedDisposition(Condition('amqp:transaction:unknown-id'))) else: - self.delivery_update(delivery, disposition.type) + self.delivery_outcome(delivery, disposition.type) # The delivery is settled in every case except a valid transaction # where the outcome is not yet known until the transaction is discharged. delivery.settle() @@ -387,10 +425,12 @@ def main(): help="address router listens on (default %default)") parser.add_option("-l", "--redelivery-limit", default=5, help="maximum redelivery attempts (default %default)") + parser.add_option("-t", "--txn-timeout", default=0, + help="transaction timeout in seconds (default %default)") opts, args = parser.parse_args() try: - Container(Broker(opts.address, opts.verbose, opts.redelivery_limit)).run() + Container(Broker(opts.address, opts.verbose, int(opts.redelivery_limit), float(opts.txn_timeout))).run() except KeyboardInterrupt: pass --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
