This is an automated email from the ASF dual-hosted git repository.

asf-gitbox-commits pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit 4db5f231edcfef0dd9f630383db945cba7f89510
Author: Andrew Stitcher <[email protected]>
AuthorDate: Mon Apr 20 16:37:13 2026 -0400

    PROTON-2921: Change default message rollback behaviour
    
    * Default rollback behaviour is now is to use a modified disposition
      with failed delivery.
    * Do this by default on receiving transactional messages within a failed
      or rolled back transaction.
    * Create options to change transactional defaults:
      Currently there is only one option which sets the default behaviour
      for received messages in a rolled back transaction.
    
    This code was written with the assistance of Cursor.
---
 cpp/CMakeLists.txt                         |  1 +
 cpp/examples/tx_recv.cpp                   |  6 ---
 cpp/include/proton/fwd.hpp                 |  1 +
 cpp/include/proton/session.hpp             |  4 ++
 cpp/include/proton/transaction_options.hpp | 76 ++++++++++++++++++++++++++++++
 cpp/src/contexts.hpp                       |  2 +
 cpp/src/messaging_adapter.cpp              | 29 ++++++++++++
 cpp/src/session.cpp                        |  7 ++-
 cpp/src/transaction_options.cpp            | 76 ++++++++++++++++++++++++++++++
 9 files changed, 195 insertions(+), 7 deletions(-)

diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 287bbb5af..467781cf8 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -119,6 +119,7 @@ set(qpid-proton-cpp-source
   src/target.cpp
   src/terminus.cpp
   src/timestamp.cpp
+  src/transaction_options.cpp
   src/tracker.cpp
   src/transfer.cpp
   src/transport.cpp
diff --git a/cpp/examples/tx_recv.cpp b/cpp/examples/tx_recv.cpp
index ef6c41bcd..b021a16bf 100644
--- a/cpp/examples/tx_recv.cpp
+++ b/cpp/examples/tx_recv.cpp
@@ -91,12 +91,6 @@ class tx_recv : public proton::messaging_handler {
 
     void on_session_transaction_aborted(proton::session &s) override {
         std::cout << "Transaction aborted!" << std::endl;
-        std::cout << "Releasing all unsettled deliveries back to the 
broker..." << std::endl;
-        for (auto r: s.receivers()) {
-            for (auto d : r.unsettled_deliveries()) {
-                d.release();
-            }
-        }
         std::cout << "Re-declaring transaction now..." << std::endl;
         current_batch = 0;
         s.transaction_declare();
diff --git a/cpp/include/proton/fwd.hpp b/cpp/include/proton/fwd.hpp
index 801d26932..cc22feb7e 100644
--- a/cpp/include/proton/fwd.hpp
+++ b/cpp/include/proton/fwd.hpp
@@ -54,6 +54,7 @@ class session_options;
 class source_options;
 class ssl;
 class target_options;
+class transaction_options;
 class tracker;
 class transport;
 class url;
diff --git a/cpp/include/proton/session.hpp b/cpp/include/proton/session.hpp
index d7ad67415..ed8bb78a1 100644
--- a/cpp/include/proton/session.hpp
+++ b/cpp/include/proton/session.hpp
@@ -114,6 +114,10 @@ PN_CPP_CLASS_EXTERN session : public 
internal::object<pn_session_t>, public endp
     /// @see transactions_page
     PN_CPP_EXTERN void transaction_declare();
 
+    /// @copydoc transaction_declare
+    /// **Unsettled API** - Declare using transaction options (see 
proton::transaction_options).
+    PN_CPP_EXTERN void transaction_declare(const transaction_options& opts);
+
     /// **Unsettled API** - Commit the currently declared transaction.
     ///
     /// Outcome is delivered asynchronously via 
messaging_handler::on_session_transaction_committed
diff --git a/cpp/include/proton/transaction_options.hpp 
b/cpp/include/proton/transaction_options.hpp
new file mode 100644
index 000000000..68d0c604e
--- /dev/null
+++ b/cpp/include/proton/transaction_options.hpp
@@ -0,0 +1,76 @@
+#ifndef PROTON_TRANSACTION_OPTIONS_HPP
+#define PROTON_TRANSACTION_OPTIONS_HPP
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "./internal/export.hpp"
+
+#include <memory>
+
+/// @file
+/// @copybrief proton::transaction_options
+
+namespace proton {
+
+/// Options for declaring a session transaction.
+///
+/// Options can be "chained" (see proton::connection_options).
+///
+/// Normal value semantics: copy or assign creates a separate copy of
+/// the options.
+class transaction_options {
+  public:
+    /// Create a set of options (unset fields use their documented defaults on 
declare).
+    PN_CPP_EXTERN transaction_options();
+
+    /// Copy options.
+    PN_CPP_EXTERN transaction_options(const transaction_options&);
+
+    PN_CPP_EXTERN ~transaction_options();
+
+    /// Copy options.
+    PN_CPP_EXTERN transaction_options& operator=(const transaction_options&);
+
+    /// Merge with another option set; values from \p other take precedence.
+    PN_CPP_EXTERN void update(const transaction_options& other);
+
+    /// **Unsettled API** — If true, when a transaction is aborted, or a commit
+    /// discharge is rejected, the client automatically dispositions any 
unsettled
+    /// incoming deliveries (on receiver links) in this session with a 
modified (failed)
+    /// outcome. If false, the application must update those deliveries itself.
+    /// The default is true.
+    PN_CPP_EXTERN transaction_options& auto_modify_on_abort(bool);
+
+  private:
+    void apply(class session&) const;
+
+    class impl;
+    std::unique_ptr<impl> impl_;
+
+  /// @cond INTERNAL
+  friend class session;
+  /// @endcond
+};
+
+} // namespace proton
+
+#endif // PROTON_TRANSACTION_OPTIONS_HPP
diff --git a/cpp/src/contexts.hpp b/cpp/src/contexts.hpp
index e85ff06f5..80aae6dae 100644
--- a/cpp/src/contexts.hpp
+++ b/cpp/src/contexts.hpp
@@ -168,6 +168,8 @@ class transaction_context {
     pn_condition_t* error = nullptr;
     binary transaction_id;
     bool failed = false;
+    /// When true, abort/fail paths in messaging_adapter modify unsettled 
incoming deliveries.
+    bool auto_modify_on_abort = true;
     enum class State {
       NO_TRANSACTION,
       DECLARING,
diff --git a/cpp/src/messaging_adapter.cpp b/cpp/src/messaging_adapter.cpp
index 27ce28a58..ff029c7e9 100644
--- a/cpp/src/messaging_adapter.cpp
+++ b/cpp/src/messaging_adapter.cpp
@@ -23,6 +23,7 @@
 
 #include "proton/connection.hpp"
 #include "proton/delivery.hpp"
+#include "proton/disposition.h"
 #include "proton/error.hpp"
 #include "proton/messaging_handler.hpp"
 #include "proton/receiver.hpp"
@@ -151,6 +152,28 @@ void settle_incoming_deliveries(session& s) {
     }
 }
 
+void modify_incoming_deliveries(session& s) {
+    // When the transaction aborts or errors, settle all incoming deliveries 
(on receiver links) with a modified
+    // disposition. Their delivery now definitively failed.
+    auto session = unwrap(s);
+    for (auto link = pn_link_head(pn_session_connection(session), 0);
+         link;
+         link = pn_link_next(link, 0)) {
+        if (pn_link_is_receiver(link) && pn_link_session(link) == session) {
+            for (auto delivery = pn_unsettled_head(link);
+                 delivery;
+                 delivery = pn_unsettled_next(delivery)) {
+                auto disposition = pn_delivery_local(delivery);
+                pn_disposition_clear(disposition);
+                auto modified_disp = pn_modified_disposition(disposition);
+                pn_modified_disposition_set_failed(modified_disp, true);
+                pn_delivery_update(delivery, PN_MODIFIED);
+                pn_delivery_settle(delivery);
+            }
+        }
+    }
+}
+
 void handle_transaction_coordinator_outcome(messaging_handler& handler, const 
tracker& t) {
     auto session = t.session();
     auto& session_context = session_context::get(unwrap(session));
@@ -180,6 +203,9 @@ void 
handle_transaction_coordinator_outcome(messaging_handler& handler, const tr
                 // Transaction abort is successful
                 transaction_context->state = 
transaction_context::State::NO_TRANSACTION;
                 settle_outgoing_deliveries(session);
+                if (transaction_context->auto_modify_on_abort) {
+                    modify_incoming_deliveries(session);
+                }
                 handler.on_session_transaction_aborted(session);
                 return;
             } else {
@@ -212,6 +238,9 @@ void 
handle_transaction_coordinator_outcome(messaging_handler& handler, const tr
                 transaction_context->state = 
transaction_context::State::NO_TRANSACTION;
                 settle_outgoing_deliveries(session);
                 transaction_context->error = 
pn_rejected_disposition_condition(rejected_disp);
+                if (transaction_context->auto_modify_on_abort) {
+                    modify_incoming_deliveries(session);
+                }
                 handler.on_session_transaction_aborted(session);
                 transaction_context->error = nullptr;
                 return;
diff --git a/cpp/src/session.cpp b/cpp/src/session.cpp
index 84956e4c2..05b4a02ce 100644
--- a/cpp/src/session.cpp
+++ b/cpp/src/session.cpp
@@ -28,6 +28,7 @@
 #include "proton/receiver_options.hpp"
 #include "proton/sender_options.hpp"
 #include "proton/session_options.hpp"
+#include "proton/transaction_options.hpp"
 #include "proton/target_options.hpp"
 #include "proton/tracker.hpp"
 #include "proton/transfer.hpp"
@@ -221,7 +222,9 @@ bool has_unsettled_outgoing_deliveries(const session& s) {
 
 }
 
-void session::transaction_declare() {
+void session::transaction_declare() { 
transaction_declare(transaction_options()); }
+
+void session::transaction_declare(const transaction_options& opts) {
     if (!transaction_is_empty(*this))
         throw proton::error("Session has already declared transaction");
 
@@ -239,6 +242,8 @@ void session::transaction_declare() {
         txn_context = 
std::make_unique<transaction_context>(open_coordinator_sender(*this));
     }
 
+    opts.apply(*this);
+
     // Declare txn
     txn_context->state = transaction_context::State::DECLARING;
 
diff --git a/cpp/src/transaction_options.cpp b/cpp/src/transaction_options.cpp
new file mode 100644
index 000000000..9258a3d3a
--- /dev/null
+++ b/cpp/src/transaction_options.cpp
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "proton/transaction_options.hpp"
+
+#include "proton/session.hpp"
+
+#include "contexts.hpp"
+#include "proton_bits.hpp"
+
+namespace proton {
+
+namespace {
+
+template <class T> struct option {
+    T value;
+    bool set;
+
+    option() : value(), set(false) {}
+    option& operator=(const T& x) { value = x;  set = true; return *this; }
+    void update(const option<T>& x) { if (x.set) *this = x.value; }
+};
+
+}
+
+class transaction_options::impl {
+  public:
+    option<bool> auto_modify_on_abort;
+
+    void apply(session& s) {
+        auto& session_context = session_context::get(unwrap(s));
+        auto& transaction_context = session_context.transaction_context_;
+    
+        if (auto_modify_on_abort.set) 
transaction_context->auto_modify_on_abort = auto_modify_on_abort.value;
+    }
+
+    void update(const impl& o) { 
auto_modify_on_abort.update(o.auto_modify_on_abort); }
+};
+
+transaction_options::transaction_options() : impl_(new impl()) {}
+transaction_options::transaction_options(const transaction_options& o) : 
impl_(new impl()) { *this = o; }
+transaction_options::~transaction_options() = default;
+
+transaction_options& transaction_options::operator=(const transaction_options& 
o) {
+    *impl_ = *o.impl_;
+    return *this;
+}
+
+void transaction_options::update(const transaction_options& o) { 
impl_->update(*o.impl_); }
+
+transaction_options& transaction_options::auto_modify_on_abort(bool b) {
+    impl_->auto_modify_on_abort = b;
+    return *this;
+}
+
+void transaction_options::apply(session& s) const { impl_->apply(s); }
+
+} // namespace proton


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to