This is an automated email from the ASF dual-hosted git repository. asf-gitbox-commits pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit e64c9fada89a940448fdf578c01a4e57f2c34844 Author: Andrew Stitcher <[email protected]> AuthorDate: Tue Jun 9 14:53:17 2026 -0400 PROTON-2873: Work around transacted modified dispositions The disposition API currently gives no access to the nested outcome of a transacted modified disposition - however the usual case has the delivery-failed boolean set for this disposition and this is the default behaviour for this disposition (otherwise the release disposition could be used instead in most cases). So we need to work around this until we have a better fix: - When fix a transacted modified outcome to be delivery-failed and redeliverable on the wire. - In the example python broker we assume that any received transacted modify will be failed delivery, redeliverable. - In the C++ API we change the processing of modify to allow transacted modifies to work. --- c/src/core/engine.c | 20 ++++++++++++++------ cpp/src/delivery.cpp | 6 +++++- python/examples/broker.py | 18 +++++++++++++----- 3 files changed, 32 insertions(+), 12 deletions(-) diff --git a/c/src/core/engine.c b/c/src/core/engine.c index 91b7e47c2..f5eafbad1 100644 --- a/c/src/core/engine.c +++ b/c/src/core/engine.c @@ -2152,12 +2152,20 @@ uint64_t pn_transactional_disposition_get_outcome_type(pn_transactional_disposit void pn_transactional_disposition_set_outcome_type(pn_transactional_disposition_t *disposition, uint64_t type) { assert(disposition); - // Generate a described LIST0 directly - this needs a max of 11 bytes - char outcome_scratch[11]; - pn_rwbytes_t scratch = {.size=sizeof(outcome_scratch), .start=outcome_scratch}; - pn_bytes_t outcome_raw = pn_amqp_encode_described_empty_list(&scratch, type); - pn_bytes_free(disposition->outcome_raw); - disposition->outcome_raw = pn_bytes_dup(outcome_raw); + if (type == PN_MODIFIED) { + // Hack to make the transacted modified disposition failed so that delivery counter is increased + char outcome[] = {0x00, 0x53, 0x27, '\xc0', 0x03, 0x02, 0x41, 0x42}; // Can use a fixed byte stream here + pn_bytes_t outcome_raw = pn_bytes(sizeof(outcome), outcome); + pn_bytes_free(disposition->outcome_raw); + disposition->outcome_raw = pn_bytes_dup(outcome_raw); + } else { + // Generate a described LIST0 directly - this needs a max of 11 bytes + char outcome_scratch[11]; + pn_rwbytes_t scratch = {.size=sizeof(outcome_scratch), .start=outcome_scratch}; + pn_bytes_t outcome_raw = pn_amqp_encode_described_empty_list(&scratch, type); + pn_bytes_free(disposition->outcome_raw); + disposition->outcome_raw = pn_bytes_dup(outcome_raw); + } } pn_delivery_tag_t pn_delivery_tag(pn_delivery_t *delivery) diff --git a/cpp/src/delivery.cpp b/cpp/src/delivery.cpp index 72066f38f..2d5939c8d 100644 --- a/cpp/src/delivery.cpp +++ b/cpp/src/delivery.cpp @@ -47,6 +47,10 @@ void settle_delivery(pn_delivery_t* o, uint64_t state) { pn_delivery_update(o, PN_TRANSACTIONAL_STATE); return; } + if (state == proton::transfer::MODIFIED) { + pn_modified_disposition_t* md = pn_modified_disposition(pn_delivery_local(o)); + if (md) pn_modified_disposition_set_failed(md, true); + } pn_delivery_update(o, state); pn_delivery_settle(o); } @@ -62,7 +66,7 @@ delivery::~delivery() = default; void delivery::accept() { settle_delivery(pn_object(), ACCEPTED); } void delivery::reject() { settle_delivery(pn_object(), REJECTED); } void delivery::release() { settle_delivery(pn_object(), RELEASED); } -void delivery::modify() { pn_disposition_set_failed(pn_delivery_local(pn_object()), true); settle_delivery(pn_object(), MODIFIED); } +void delivery::modify() { settle_delivery(pn_object(), MODIFIED); } delivery_iterator delivery_iterator::operator++() { if (!!obj_) { diff --git a/python/examples/broker.py b/python/examples/broker.py index 7852d938a..a791646c7 100755 --- a/python/examples/broker.py +++ b/python/examples/broker.py @@ -210,15 +210,14 @@ class Broker(MessagingHandler): del q self._verbose_print(f"{address=}: Removed") - def _modify_delivery(self, delivery: Delivery, message: Message, address: str): - disposition = delivery.remote + def _modify_delivery(self, delivery: Delivery, undeliverable: bool, failed: bool, message: Message, address: str): # If not deliverable don't requeue - if disposition.undeliverable: + if undeliverable: self._verbose_print(f"{delivery.tag=}: Modified: Undeliverable: {message.id=}") # Don't requeue the message return # Check if we need to update the delivery count - if disposition.failed: + if failed: if message.delivery_count >= self.redelivery_limit: self._verbose_print(f"{delivery.tag=}: Modified: Redelivery limit exceeded: {message.id=}") # Don't requeue the message @@ -244,7 +243,16 @@ class Broker(MessagingHandler): # Requeue the message from the delivery self._queue(address).publish(message) elif outcome == Disposition.MODIFIED: - self._modify_delivery(delivery, message, address) + disposition = delivery.remote + # This is a little hack which assumes that if the outcome is modified but the disposition is not, + # then the disposition was a transacted modified disposition and should be treated as failed delivery. + if disposition.type == Disposition.MODIFIED: + undeliverable = disposition.undeliverable + failed = disposition.failed + else: + undeliverable = False + failed = True + self._modify_delivery(delivery, undeliverable=undeliverable, failed=failed, message=message, address=address) delivery.settle() del unsettled_delivery --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
