This is an automated email from the ASF dual-hosted git repository. asf-gitbox-commits pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit 7cfcc8344fb4a36ba6ba6d35819a1bf25fca4247 Author: Andrew Stitcher <[email protected]> AuthorDate: Fri May 1 19:00:26 2026 -0400 PROTON-1442: Correct outgoing message callbacks Make sure that the correct callbacks for the final committed disposition of messages happens on commit. [On abort or commit error, then there is no callback as there is no final disposition update - but there may be a provisional update callback which happened during the transaction] This code was written with the assistance of Cursor. --- cpp/src/contexts.hpp | 3 +++ cpp/src/messaging_adapter.cpp | 50 ++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 50 insertions(+), 3 deletions(-) diff --git a/cpp/src/contexts.hpp b/cpp/src/contexts.hpp index 80aae6dae..3f559e653 100644 --- a/cpp/src/contexts.hpp +++ b/cpp/src/contexts.hpp @@ -26,11 +26,13 @@ #include "proton/work_queue.hpp" #include "proton/message.hpp" +#include "proton/tracker.hpp" #include "proton/object.h" #include "proton/condition.h" #include <memory> +#include <unordered_set> struct pn_record_t; struct pn_link_t; @@ -164,6 +166,7 @@ class session_context : public context { class transaction_context { public: transaction_context(pn_link_t* coordinator); + std::unordered_set<tracker> provisional_outcomes; pn_link_t* coordinator; pn_condition_t* error = nullptr; binary transaction_id; diff --git a/cpp/src/messaging_adapter.cpp b/cpp/src/messaging_adapter.cpp index ff029c7e9..e4812d7d3 100644 --- a/cpp/src/messaging_adapter.cpp +++ b/cpp/src/messaging_adapter.cpp @@ -114,6 +114,36 @@ bool transaction_coordinator_sender(const sender& s) { return txn_context && (txn_context->coordinator == unwrap(s)); } +void handle_outgoing_committed_deliveries(messaging_handler& handler, session &s) { + auto session = unwrap(s); + auto& transaction_context = session_context::get(session).transaction_context_; + for (auto t : transaction_context->provisional_outcomes) { + auto disposition = pn_delivery_remote(unwrap(t)); + auto trans_disp = pn_transactional_disposition(disposition); + auto outcome = trans_disp ? pn_transactional_disposition_get_outcome_type(trans_disp) : pn_disposition_type(disposition); + switch (outcome) { + case PN_ACCEPTED: + handler.on_tracker_accept(t); + break; + case PN_REJECTED: + handler.on_tracker_reject(t); + break; + case PN_RELEASED: + case PN_MODIFIED: + handler.on_tracker_release(t); + break; + } + if (t.settled()) { + handler.on_tracker_settle(t); + auto lctx = link_context::get(unwrap(t.sender())); + if (lctx.auto_settle) { + t.settle(); + } + } + } + transaction_context->provisional_outcomes.clear(); +} + void settle_outgoing_deliveries(session& s) { // If the transaction is aborted, then for any messages that we sent that are still unsettled, we need to settle them // There is no need for a disposition update beyond that as the broker will have already done the abort. @@ -129,6 +159,8 @@ void settle_outgoing_deliveries(session& s) { for (auto delivery = pn_unsettled_head(link); delivery; delivery = pn_unsettled_next(delivery)) { + // No disposition update just settle (these messages were aborted) + pn_disposition_clear(pn_delivery_local(delivery)); pn_delivery_settle(delivery); } } @@ -212,6 +244,7 @@ void handle_transaction_coordinator_outcome(messaging_handler& handler, const tr // Transaction commit is successful transaction_context->state = transaction_context::State::NO_TRANSACTION; settle_incoming_deliveries(session); + handle_outgoing_committed_deliveries(handler, session); handler.on_session_transaction_committed(session); return; } @@ -256,10 +289,17 @@ void handle_transaction_coordinator_outcome(messaging_handler& handler, const tr } void handle_transacted_delivery_outcome(messaging_handler& handler, pn_delivery_t* d, tracker& t) { - // Don't call handlers if we're in the process of committing or aborting the transaction. - // Because we will get settlement updates at that point. auto& session_context = session_context::get(unwrap(t.session())); auto& transaction_context = session_context.transaction_context_; + + // We should hold on to tracker here so that we can call the non transactional handlers if the transaction + // commits without error. We can't do this triggered by the the incoming settlement later as it will happen before + // the transaction commit/abort handlers are called. And semantically the accept etc. occur only after we know the + // transaction outcome. + transaction_context->provisional_outcomes.insert(t); + + // Don't call handlers if we're in the process of committing or aborting the transaction. + // Because we will get settlement updates at that point. if (transaction_context->state != transaction_context::State::DECLARED) { return; } @@ -359,7 +399,11 @@ void on_delivery(messaging_handler& handler, pn_event_t* event) { handle_transacted_delivery_outcome(handler, dlv, t); break; } - if (t.settled()) { + // If we're currently discharging the transaction, don't deal with settlement yet: + // We will deal with it when the transaction is committed/aborted. + auto& session_context = session_context::get(unwrap(t.session())); + auto& transaction_context = session_context.transaction_context_; + if (t.settled() && (!transaction_context || transaction_context->state != transaction_context::State::DISCHARGING)) { handler.on_tracker_settle(t); if (lctx.auto_settle) t.settle(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
