This is an automated email from the ASF dual-hosted git repository.

asf-gitbox-commits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit 7cfcc8344fb4a36ba6ba6d35819a1bf25fca4247
Author: Andrew Stitcher <[email protected]>
AuthorDate: Fri May 1 19:00:26 2026 -0400

    PROTON-1442: Correct outgoing message callbacks
    
    Make sure that the correct callbacks for the final committed
    disposition of messages happens on commit.
    
    [On abort or commit error, then there is no callback as there is no
    final disposition update - but there may be a provisional update
    callback which happened during the transaction]
    
    This code was written with the assistance of Cursor.
---
 cpp/src/contexts.hpp          |  3 +++
 cpp/src/messaging_adapter.cpp | 50 ++++++++++++++++++++++++++++++++++++++++---
 2 files changed, 50 insertions(+), 3 deletions(-)

diff --git a/cpp/src/contexts.hpp b/cpp/src/contexts.hpp
index 80aae6dae..3f559e653 100644
--- a/cpp/src/contexts.hpp
+++ b/cpp/src/contexts.hpp
@@ -26,11 +26,13 @@
 
 #include "proton/work_queue.hpp"
 #include "proton/message.hpp"
+#include "proton/tracker.hpp"
 
 #include "proton/object.h"
 #include "proton/condition.h"
 
 #include <memory>
+#include <unordered_set>
 
 struct pn_record_t;
 struct pn_link_t;
@@ -164,6 +166,7 @@ class session_context : public context {
 class transaction_context {
   public:
     transaction_context(pn_link_t* coordinator);
+    std::unordered_set<tracker> provisional_outcomes;
     pn_link_t* coordinator;
     pn_condition_t* error = nullptr;
     binary transaction_id;
diff --git a/cpp/src/messaging_adapter.cpp b/cpp/src/messaging_adapter.cpp
index ff029c7e9..e4812d7d3 100644
--- a/cpp/src/messaging_adapter.cpp
+++ b/cpp/src/messaging_adapter.cpp
@@ -114,6 +114,36 @@ bool transaction_coordinator_sender(const sender& s) {
     return txn_context && (txn_context->coordinator == unwrap(s));
 }
 
+void handle_outgoing_committed_deliveries(messaging_handler& handler, session 
&s) {
+    auto session = unwrap(s);
+    auto& transaction_context = 
session_context::get(session).transaction_context_;
+    for (auto t : transaction_context->provisional_outcomes) {
+        auto disposition = pn_delivery_remote(unwrap(t));
+        auto trans_disp = pn_transactional_disposition(disposition);
+        auto outcome = trans_disp ? 
pn_transactional_disposition_get_outcome_type(trans_disp) : 
pn_disposition_type(disposition);
+        switch (outcome) {
+        case PN_ACCEPTED:
+            handler.on_tracker_accept(t);
+            break;
+        case PN_REJECTED:
+            handler.on_tracker_reject(t);
+            break;
+        case PN_RELEASED:
+        case PN_MODIFIED:
+            handler.on_tracker_release(t);
+            break;
+        }
+        if (t.settled()) {
+            handler.on_tracker_settle(t);
+            auto lctx = link_context::get(unwrap(t.sender()));
+            if (lctx.auto_settle) {
+                t.settle();
+            }
+        }
+    }
+    transaction_context->provisional_outcomes.clear();
+}
+
 void settle_outgoing_deliveries(session& s) {
     // If the transaction is aborted, then for any messages that we sent that 
are still unsettled, we need to settle them
     // There is no need for a disposition update beyond that as the broker 
will have already done the abort.
@@ -129,6 +159,8 @@ void settle_outgoing_deliveries(session& s) {
             for (auto delivery = pn_unsettled_head(link);
                  delivery;
                  delivery = pn_unsettled_next(delivery)) {
+                // No disposition update just settle (these messages were 
aborted)
+                pn_disposition_clear(pn_delivery_local(delivery));
                 pn_delivery_settle(delivery);
             }
         }
@@ -212,6 +244,7 @@ void 
handle_transaction_coordinator_outcome(messaging_handler& handler, const tr
                 // Transaction commit is successful
                 transaction_context->state = 
transaction_context::State::NO_TRANSACTION;
                 settle_incoming_deliveries(session);
+                handle_outgoing_committed_deliveries(handler, session);
                 handler.on_session_transaction_committed(session);
                 return;
             }
@@ -256,10 +289,17 @@ void 
handle_transaction_coordinator_outcome(messaging_handler& handler, const tr
 }
 
 void handle_transacted_delivery_outcome(messaging_handler& handler, 
pn_delivery_t* d, tracker& t) {
-    // Don't call handlers if we're in the process of committing or aborting 
the transaction.
-    // Because we will get settlement updates at that point.
     auto& session_context = session_context::get(unwrap(t.session()));
     auto& transaction_context = session_context.transaction_context_;
+
+    // We should hold on to tracker here so that we can call the non 
transactional handlers if the transaction
+    // commits without error. We can't do this triggered by the the incoming 
settlement later as it will happen before
+    // the transaction commit/abort handlers are called. And semantically the 
accept etc. occur only after we know the
+    // transaction outcome.
+    transaction_context->provisional_outcomes.insert(t);
+
+    // Don't call handlers if we're in the process of committing or aborting 
the transaction.
+    // Because we will get settlement updates at that point.
     if (transaction_context->state != transaction_context::State::DECLARED) {
         return;
     }
@@ -359,7 +399,11 @@ void on_delivery(messaging_handler& handler, pn_event_t* 
event) {
                 handle_transacted_delivery_outcome(handler, dlv, t);
                 break;
             }
-            if (t.settled()) {
+            // If we're currently discharging the transaction, don't deal with 
settlement yet:
+            // We will deal with it when the transaction is committed/aborted.
+            auto& session_context = session_context::get(unwrap(t.session()));
+            auto& transaction_context = session_context.transaction_context_;
+            if (t.settled() && (!transaction_context || 
transaction_context->state != transaction_context::State::DISCHARGING)) {
                 handler.on_tracker_settle(t);
                 if (lctx.auto_settle)
                     t.settle();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to