This is an automated email from the ASF dual-hosted git repository. asf-gitbox-commits pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit 4db5f231edcfef0dd9f630383db945cba7f89510 Author: Andrew Stitcher <[email protected]> AuthorDate: Mon Apr 20 16:37:13 2026 -0400 PROTON-2921: Change default message rollback behaviour * Default rollback behaviour is now is to use a modified disposition with failed delivery. * Do this by default on receiving transactional messages within a failed or rolled back transaction. * Create options to change transactional defaults: Currently there is only one option which sets the default behaviour for received messages in a rolled back transaction. This code was written with the assistance of Cursor. --- cpp/CMakeLists.txt | 1 + cpp/examples/tx_recv.cpp | 6 --- cpp/include/proton/fwd.hpp | 1 + cpp/include/proton/session.hpp | 4 ++ cpp/include/proton/transaction_options.hpp | 76 ++++++++++++++++++++++++++++++ cpp/src/contexts.hpp | 2 + cpp/src/messaging_adapter.cpp | 29 ++++++++++++ cpp/src/session.cpp | 7 ++- cpp/src/transaction_options.cpp | 76 ++++++++++++++++++++++++++++++ 9 files changed, 195 insertions(+), 7 deletions(-) diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 287bbb5af..467781cf8 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -119,6 +119,7 @@ set(qpid-proton-cpp-source src/target.cpp src/terminus.cpp src/timestamp.cpp + src/transaction_options.cpp src/tracker.cpp src/transfer.cpp src/transport.cpp diff --git a/cpp/examples/tx_recv.cpp b/cpp/examples/tx_recv.cpp index ef6c41bcd..b021a16bf 100644 --- a/cpp/examples/tx_recv.cpp +++ b/cpp/examples/tx_recv.cpp @@ -91,12 +91,6 @@ class tx_recv : public proton::messaging_handler { void on_session_transaction_aborted(proton::session &s) override { std::cout << "Transaction aborted!" << std::endl; - std::cout << "Releasing all unsettled deliveries back to the broker..." << std::endl; - for (auto r: s.receivers()) { - for (auto d : r.unsettled_deliveries()) { - d.release(); - } - } std::cout << "Re-declaring transaction now..." << std::endl; current_batch = 0; s.transaction_declare(); diff --git a/cpp/include/proton/fwd.hpp b/cpp/include/proton/fwd.hpp index 801d26932..cc22feb7e 100644 --- a/cpp/include/proton/fwd.hpp +++ b/cpp/include/proton/fwd.hpp @@ -54,6 +54,7 @@ class session_options; class source_options; class ssl; class target_options; +class transaction_options; class tracker; class transport; class url; diff --git a/cpp/include/proton/session.hpp b/cpp/include/proton/session.hpp index d7ad67415..ed8bb78a1 100644 --- a/cpp/include/proton/session.hpp +++ b/cpp/include/proton/session.hpp @@ -114,6 +114,10 @@ PN_CPP_CLASS_EXTERN session : public internal::object<pn_session_t>, public endp /// @see transactions_page PN_CPP_EXTERN void transaction_declare(); + /// @copydoc transaction_declare + /// **Unsettled API** - Declare using transaction options (see proton::transaction_options). + PN_CPP_EXTERN void transaction_declare(const transaction_options& opts); + /// **Unsettled API** - Commit the currently declared transaction. /// /// Outcome is delivered asynchronously via messaging_handler::on_session_transaction_committed diff --git a/cpp/include/proton/transaction_options.hpp b/cpp/include/proton/transaction_options.hpp new file mode 100644 index 000000000..68d0c604e --- /dev/null +++ b/cpp/include/proton/transaction_options.hpp @@ -0,0 +1,76 @@ +#ifndef PROTON_TRANSACTION_OPTIONS_HPP +#define PROTON_TRANSACTION_OPTIONS_HPP + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "./internal/export.hpp" + +#include <memory> + +/// @file +/// @copybrief proton::transaction_options + +namespace proton { + +/// Options for declaring a session transaction. +/// +/// Options can be "chained" (see proton::connection_options). +/// +/// Normal value semantics: copy or assign creates a separate copy of +/// the options. +class transaction_options { + public: + /// Create a set of options (unset fields use their documented defaults on declare). + PN_CPP_EXTERN transaction_options(); + + /// Copy options. + PN_CPP_EXTERN transaction_options(const transaction_options&); + + PN_CPP_EXTERN ~transaction_options(); + + /// Copy options. + PN_CPP_EXTERN transaction_options& operator=(const transaction_options&); + + /// Merge with another option set; values from \p other take precedence. + PN_CPP_EXTERN void update(const transaction_options& other); + + /// **Unsettled API** — If true, when a transaction is aborted, or a commit + /// discharge is rejected, the client automatically dispositions any unsettled + /// incoming deliveries (on receiver links) in this session with a modified (failed) + /// outcome. If false, the application must update those deliveries itself. + /// The default is true. + PN_CPP_EXTERN transaction_options& auto_modify_on_abort(bool); + + private: + void apply(class session&) const; + + class impl; + std::unique_ptr<impl> impl_; + + /// @cond INTERNAL + friend class session; + /// @endcond +}; + +} // namespace proton + +#endif // PROTON_TRANSACTION_OPTIONS_HPP diff --git a/cpp/src/contexts.hpp b/cpp/src/contexts.hpp index e85ff06f5..80aae6dae 100644 --- a/cpp/src/contexts.hpp +++ b/cpp/src/contexts.hpp @@ -168,6 +168,8 @@ class transaction_context { pn_condition_t* error = nullptr; binary transaction_id; bool failed = false; + /// When true, abort/fail paths in messaging_adapter modify unsettled incoming deliveries. + bool auto_modify_on_abort = true; enum class State { NO_TRANSACTION, DECLARING, diff --git a/cpp/src/messaging_adapter.cpp b/cpp/src/messaging_adapter.cpp index 27ce28a58..ff029c7e9 100644 --- a/cpp/src/messaging_adapter.cpp +++ b/cpp/src/messaging_adapter.cpp @@ -23,6 +23,7 @@ #include "proton/connection.hpp" #include "proton/delivery.hpp" +#include "proton/disposition.h" #include "proton/error.hpp" #include "proton/messaging_handler.hpp" #include "proton/receiver.hpp" @@ -151,6 +152,28 @@ void settle_incoming_deliveries(session& s) { } } +void modify_incoming_deliveries(session& s) { + // When the transaction aborts or errors, settle all incoming deliveries (on receiver links) with a modified + // disposition. Their delivery now definitively failed. + auto session = unwrap(s); + for (auto link = pn_link_head(pn_session_connection(session), 0); + link; + link = pn_link_next(link, 0)) { + if (pn_link_is_receiver(link) && pn_link_session(link) == session) { + for (auto delivery = pn_unsettled_head(link); + delivery; + delivery = pn_unsettled_next(delivery)) { + auto disposition = pn_delivery_local(delivery); + pn_disposition_clear(disposition); + auto modified_disp = pn_modified_disposition(disposition); + pn_modified_disposition_set_failed(modified_disp, true); + pn_delivery_update(delivery, PN_MODIFIED); + pn_delivery_settle(delivery); + } + } + } +} + void handle_transaction_coordinator_outcome(messaging_handler& handler, const tracker& t) { auto session = t.session(); auto& session_context = session_context::get(unwrap(session)); @@ -180,6 +203,9 @@ void handle_transaction_coordinator_outcome(messaging_handler& handler, const tr // Transaction abort is successful transaction_context->state = transaction_context::State::NO_TRANSACTION; settle_outgoing_deliveries(session); + if (transaction_context->auto_modify_on_abort) { + modify_incoming_deliveries(session); + } handler.on_session_transaction_aborted(session); return; } else { @@ -212,6 +238,9 @@ void handle_transaction_coordinator_outcome(messaging_handler& handler, const tr transaction_context->state = transaction_context::State::NO_TRANSACTION; settle_outgoing_deliveries(session); transaction_context->error = pn_rejected_disposition_condition(rejected_disp); + if (transaction_context->auto_modify_on_abort) { + modify_incoming_deliveries(session); + } handler.on_session_transaction_aborted(session); transaction_context->error = nullptr; return; diff --git a/cpp/src/session.cpp b/cpp/src/session.cpp index 84956e4c2..05b4a02ce 100644 --- a/cpp/src/session.cpp +++ b/cpp/src/session.cpp @@ -28,6 +28,7 @@ #include "proton/receiver_options.hpp" #include "proton/sender_options.hpp" #include "proton/session_options.hpp" +#include "proton/transaction_options.hpp" #include "proton/target_options.hpp" #include "proton/tracker.hpp" #include "proton/transfer.hpp" @@ -221,7 +222,9 @@ bool has_unsettled_outgoing_deliveries(const session& s) { } -void session::transaction_declare() { +void session::transaction_declare() { transaction_declare(transaction_options()); } + +void session::transaction_declare(const transaction_options& opts) { if (!transaction_is_empty(*this)) throw proton::error("Session has already declared transaction"); @@ -239,6 +242,8 @@ void session::transaction_declare() { txn_context = std::make_unique<transaction_context>(open_coordinator_sender(*this)); } + opts.apply(*this); + // Declare txn txn_context->state = transaction_context::State::DECLARING; diff --git a/cpp/src/transaction_options.cpp b/cpp/src/transaction_options.cpp new file mode 100644 index 000000000..9258a3d3a --- /dev/null +++ b/cpp/src/transaction_options.cpp @@ -0,0 +1,76 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "proton/transaction_options.hpp" + +#include "proton/session.hpp" + +#include "contexts.hpp" +#include "proton_bits.hpp" + +namespace proton { + +namespace { + +template <class T> struct option { + T value; + bool set; + + option() : value(), set(false) {} + option& operator=(const T& x) { value = x; set = true; return *this; } + void update(const option<T>& x) { if (x.set) *this = x.value; } +}; + +} + +class transaction_options::impl { + public: + option<bool> auto_modify_on_abort; + + void apply(session& s) { + auto& session_context = session_context::get(unwrap(s)); + auto& transaction_context = session_context.transaction_context_; + + if (auto_modify_on_abort.set) transaction_context->auto_modify_on_abort = auto_modify_on_abort.value; + } + + void update(const impl& o) { auto_modify_on_abort.update(o.auto_modify_on_abort); } +}; + +transaction_options::transaction_options() : impl_(new impl()) {} +transaction_options::transaction_options(const transaction_options& o) : impl_(new impl()) { *this = o; } +transaction_options::~transaction_options() = default; + +transaction_options& transaction_options::operator=(const transaction_options& o) { + *impl_ = *o.impl_; + return *this; +} + +void transaction_options::update(const transaction_options& o) { impl_->update(*o.impl_); } + +transaction_options& transaction_options::auto_modify_on_abort(bool b) { + impl_->auto_modify_on_abort = b; + return *this; +} + +void transaction_options::apply(session& s) const { impl_->apply(s); } + +} // namespace proton --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
