This is an automated email from the ASF dual-hosted git repository. astitcher pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit b6673de70e5d0d0d9d09a35c1dbf657f675bafbb Author: Rakhi Kumari <[email protected]> AuthorDate: Sat Oct 4 01:20:17 2025 -0400 PROTON-1442: [C++] Implement local transactions Implement handling for declaring and discharging transactions on AMQP sessions. Added new callbacks which allow the application to respond to transaction events: declaring, committing and aborting transactions; provisionally accepting, rejecting & releasing deliveries in a transaction. This code was written with the assistance of Cursor. --- cpp/include/proton/messaging_handler.hpp | 31 +++++ cpp/include/proton/session.hpp | 27 ++++- cpp/src/contexts.cpp | 6 + cpp/src/contexts.hpp | 23 +++- cpp/src/delivery.cpp | 11 ++ cpp/src/handler.cpp | 10 ++ cpp/src/messaging_adapter.cpp | 187 ++++++++++++++++++++++++++++--- cpp/src/node_options.cpp | 2 - cpp/src/sender.cpp | 9 ++ cpp/src/session.cpp | 114 +++++++++++++++++++ 10 files changed, 399 insertions(+), 21 deletions(-) diff --git a/cpp/include/proton/messaging_handler.hpp b/cpp/include/proton/messaging_handler.hpp index 213dbe73e..0754b0c77 100644 --- a/cpp/include/proton/messaging_handler.hpp +++ b/cpp/include/proton/messaging_handler.hpp @@ -172,6 +172,37 @@ PN_CPP_CLASS_EXTERN messaging_handler { /// The remote peer closed the session with an error condition. PN_CPP_EXTERN virtual void on_session_error(session&); + /// **Unsettled API** - Called when a local transaction is declared. + PN_CPP_EXTERN virtual void on_session_transaction_declared(session&); + + /// **Unsettled API** - Called when a local transaction is discharged successfully. + PN_CPP_EXTERN virtual void on_session_transaction_committed(session&); + + /// **Unsettled API** - Called when a local transaction is discharged unsuccessfully (aborted). + /// This is either due to an explicit abort or a failure during commit. + /// In either case any action taken under the transaction is as if it never + /// happened. + PN_CPP_EXTERN virtual void on_session_transaction_aborted(session&); + + /// **Unsettled API** - Called when a local transaction operation fails. + PN_CPP_EXTERN virtual void on_session_transaction_error(session&); + + /// **Unsettled API** - Called when a transactioned delivery is provisionally accepted. + /// This means that if the transaction successfully commits the delivery + /// will be accepted. If the transaction aborts the delivery will be + /// as if it never happened. + PN_CPP_EXTERN virtual void on_transactional_accept(tracker&); + + /// **Unsettled API** - Called when a transactioned delivery is provisionally rejected. + /// This means that if the transaction successfully commits the delivery + /// will be rejected. + PN_CPP_EXTERN virtual void on_transactional_reject(tracker&); + + /// **Unsettled API** - Called when a transactioned delivery is provisionally released. + /// This means that if the transaction successfully commits the delivery + /// will be released (including modifying the delivery count). + PN_CPP_EXTERN virtual void on_transactional_release(tracker&); + /// The remote peer opened the link. PN_CPP_EXTERN virtual void on_receiver_open(receiver&); diff --git a/cpp/include/proton/session.hpp b/cpp/include/proton/session.hpp index ea7d3bbc1..24438caa9 100644 --- a/cpp/include/proton/session.hpp +++ b/cpp/include/proton/session.hpp @@ -105,14 +105,33 @@ PN_CPP_CLASS_EXTERN session : public internal::object<pn_session_t>, public endp /// Get user data from this session. PN_CPP_EXTERN void* user_data() const; + /// **Unsettled API** - Declare a new local transaction on this session. + PN_CPP_EXTERN void transaction_declare(); + + /// **Unsettled API** - Commit the currently declared transaction. + PN_CPP_EXTERN void transaction_commit(); + + /// **Unsettled API** - Abort the currently declared transaction. + PN_CPP_EXTERN void transaction_abort(); + + /// **Unsettled API** - Return true if a transaction is currently declared. + PN_CPP_EXTERN bool transaction_is_declared() const; + + /// **Unsettled API** - Return the identifier of the current transaction. + PN_CPP_EXTERN binary transaction_id() const; + + /// **Unsettled API** - Return the error condition associated with transaction. + PN_CPP_EXTERN error_condition transaction_error() const; + /// @cond INTERNAL - friend class internal::factory<session>; - friend class session_iterator; + friend class internal::factory<session>; + friend class sender; + friend class session_iterator; /// @endcond }; /// @cond INTERNAL - + /// An iterator of sessions. class session_iterator : public internal::iter_base<session, session_iterator> { public: @@ -126,7 +145,7 @@ class session_iterator : public internal::iter_base<session, session_iterator> { typedef internal::iter_range<session_iterator> session_range; /// @endcond - + } // proton #endif // PROTON_SESSION_HPP diff --git a/cpp/src/contexts.cpp b/cpp/src/contexts.cpp index a3b393968..6c2838da7 100644 --- a/cpp/src/contexts.cpp +++ b/cpp/src/contexts.cpp @@ -87,6 +87,12 @@ link_context& link_context::get(pn_link_t* l) { return ref<link_context>(id(pn_link_attachments(l), LINK_CONTEXT)); } +transaction_context::transaction_context(pn_link_t* coordinator_sender) : + coordinator(coordinator_sender) +{} + +session_context::session_context() : handler(nullptr), user_data_(nullptr) {} + session_context& session_context::get(pn_session_t* s) { return ref<session_context>(id(pn_session_attachments(s), SESSION_CONTEXT)); } diff --git a/cpp/src/contexts.hpp b/cpp/src/contexts.hpp index 7ebab1d7b..e85ff06f5 100644 --- a/cpp/src/contexts.hpp +++ b/cpp/src/contexts.hpp @@ -28,6 +28,7 @@ #include "proton/message.hpp" #include "proton/object.h" +#include "proton/condition.h" #include <memory> @@ -147,15 +148,35 @@ class link_context : public context { void* user_data_; }; +class transaction_context; + class session_context : public context { public: - session_context() : handler(0), user_data_(nullptr) {} + session_context(); static session_context& get(pn_session_t* s); messaging_handler* handler; + std::unique_ptr<transaction_context> transaction_context_; void* user_data_; }; +// This is not a context object on its own, but an optional part of session +class transaction_context { + public: + transaction_context(pn_link_t* coordinator); + pn_link_t* coordinator; + pn_condition_t* error = nullptr; + binary transaction_id; + bool failed = false; + enum class State { + NO_TRANSACTION, + DECLARING, + DECLARED, + DISCHARGING, + }; + State state = State::NO_TRANSACTION; +}; + class transfer_context : public context { public: transfer_context() : user_data_(nullptr) {} diff --git a/cpp/src/delivery.cpp b/cpp/src/delivery.cpp index 654f50a36..102eca671 100644 --- a/cpp/src/delivery.cpp +++ b/cpp/src/delivery.cpp @@ -33,9 +33,20 @@ #include "proton/binary.hpp" +#include <proton/session.hpp> + namespace { void settle_delivery(pn_delivery_t* o, uint64_t state) { + proton::session session = proton::make_wrapper(o).session(); + if(session.transaction_is_declared()) { + // Transactional disposition + auto disp = pn_transactional_disposition(pn_delivery_local(o)); + pn_transactional_disposition_set_id(disp, pn_bytes(session.transaction_id())); + pn_transactional_disposition_set_outcome_type(disp, state); + pn_delivery_update(o, PN_TRANSACTIONAL_STATE); + return; + } pn_delivery_update(o, state); pn_delivery_settle(o); } diff --git a/cpp/src/handler.cpp b/cpp/src/handler.cpp index 1632efda6..152c519b2 100644 --- a/cpp/src/handler.cpp +++ b/cpp/src/handler.cpp @@ -65,6 +65,16 @@ void messaging_handler::on_session_open(session &s) { pn_session_open(unwrap(s)); } } + +void messaging_handler::on_session_transaction_declared(session &) {} +void messaging_handler::on_session_transaction_committed(session &) {} +void messaging_handler::on_session_transaction_aborted(session &) {} +void messaging_handler::on_session_transaction_error(session &s) { on_session_error(s); } + +void messaging_handler::on_transactional_accept(tracker &) {} +void messaging_handler::on_transactional_reject(tracker &) {} +void messaging_handler::on_transactional_release(tracker &) {} + void messaging_handler::on_receiver_close(receiver &) {} void messaging_handler::on_receiver_error(receiver &l) { on_error(l.error()); } void messaging_handler::on_receiver_open(receiver &l) { diff --git a/cpp/src/messaging_adapter.cpp b/cpp/src/messaging_adapter.cpp index f90cd7613..27ce28a58 100644 --- a/cpp/src/messaging_adapter.cpp +++ b/cpp/src/messaging_adapter.cpp @@ -22,14 +22,11 @@ #include "messaging_adapter.hpp" #include "proton/connection.hpp" -#include "proton/container.hpp" #include "proton/delivery.hpp" #include "proton/error.hpp" #include "proton/messaging_handler.hpp" #include "proton/receiver.hpp" -#include "proton/receiver_options.hpp" #include "proton/sender.hpp" -#include "proton/sender_options.hpp" #include "proton/session.hpp" #include "proton/tracker.hpp" #include "proton/transport.hpp" @@ -37,6 +34,8 @@ #include "contexts.hpp" #include "msg.hpp" #include "proton_bits.hpp" +#include "tracing_private.hpp" +#include "types_internal.hpp" #include <proton/connection.h> #include <proton/delivery.h> @@ -45,8 +44,7 @@ #include <proton/message.h> #include <proton/session.h> #include <proton/transport.h> - -#include "tracing_private.hpp" +#include <proton/types.h> #include <assert.h> @@ -69,7 +67,7 @@ void on_link_flow(messaging_handler& handler, pn_event_t* event) { // TODO: process session flow data, if no link-specific data, just return. if (!lnk) return; int state = pn_link_state(lnk); - if ((state&PN_LOCAL_ACTIVE) && (state&PN_REMOTE_ACTIVE)) { + if (((state & PN_LOCAL_ACTIVE) && (state & PN_REMOTE_ACTIVE))) { link_context& lctx = link_context::get(lnk); if (pn_link_is_sender(lnk)) { if (pn_link_credit(lnk) > 0) { @@ -110,6 +108,151 @@ void message_decode(message& msg, proton::delivery delivery) { pn_link_advance(unwrap(link)); } +bool transaction_coordinator_sender(const sender& s) { + auto& txn_context = session_context::get(unwrap(s.session())).transaction_context_; + return txn_context && (txn_context->coordinator == unwrap(s)); +} + +void settle_outgoing_deliveries(session& s) { + // If the transaction is aborted, then for any messages that we sent that are still unsettled, we need to settle them + // There is no need for a disposition update beyond that as the broker will have already done the abort. + // We do need to know the deliveries that are involved in the transaction so we can settle them - since we + // scope our transactions to a session, we will settle all outgoing deliveries for the session. + auto session = unwrap(s); + auto& transaction_context = session_context::get(session).transaction_context_; + auto coordinator = transaction_context->coordinator; + for (auto link = pn_link_head(pn_session_connection(session), 0); + link; + link = pn_link_next(link, 0)) { + if (pn_link_is_sender(link) && pn_link_session(link) == session && link != coordinator) { + for (auto delivery = pn_unsettled_head(link); + delivery; + delivery = pn_unsettled_next(delivery)) { + pn_delivery_settle(delivery); + } + } + } +} + +void settle_incoming_deliveries(session& s) { + // When the transaction commits, settle all incoming deliveries (on receiver links) that we + // had sent a provisional disposition for. They are now definitively accepted/rejected/released. + auto session = unwrap(s); + for (auto link = pn_link_head(pn_session_connection(session), 0); + link; + link = pn_link_next(link, 0)) { + if (pn_link_is_receiver(link) && pn_link_session(link) == session) { + for (auto delivery = pn_unsettled_head(link); + delivery; + delivery = pn_unsettled_next(delivery)) { + pn_delivery_settle(delivery); + } + } + } +} + +void handle_transaction_coordinator_outcome(messaging_handler& handler, const tracker& t) { + auto session = t.session(); + auto& session_context = session_context::get(unwrap(session)); + auto& transaction_context = session_context.transaction_context_; + auto state = transaction_context->state; + auto disposition = pn_delivery_remote(unwrap(t)); + if (auto *declared_disp = pn_declared_disposition(disposition); declared_disp) { + switch (state) { + case transaction_context::State::DECLARING: { + pn_bytes_t txn_id = pn_declared_disposition_get_id(declared_disp); + transaction_context->transaction_id = proton::bin(txn_id); + transaction_context->state = transaction_context::State::DECLARED; + handler.on_session_transaction_declared(session); + return; + } + case transaction_context::State::NO_TRANSACTION: + case transaction_context::State::DECLARED: + case transaction_context::State::DISCHARGING: + // Don't throw error here, instead close link with error + make_wrapper(transaction_context->coordinator).close(error_condition{"amqp:not-allowed", "Received transaction declared disposition in invalid state"}); + transaction_context.reset(); + } + } else if (pn_disposition_type(disposition) == PN_ACCEPTED) { + switch (state) { + case transaction_context::State::DISCHARGING: { + if (transaction_context->failed) { + // Transaction abort is successful + transaction_context->state = transaction_context::State::NO_TRANSACTION; + settle_outgoing_deliveries(session); + handler.on_session_transaction_aborted(session); + return; + } else { + // Transaction commit is successful + transaction_context->state = transaction_context::State::NO_TRANSACTION; + settle_incoming_deliveries(session); + handler.on_session_transaction_committed(session); + return; + } + } + case transaction_context::State::NO_TRANSACTION: + case transaction_context::State::DECLARING: + case transaction_context::State::DECLARED: + // TODO: Don't throw error here, instead detach link or close session? + make_wrapper(transaction_context->coordinator).close(error_condition{"amqp:not-allowed", "Received transaction accepted disposition in invalid state"}); + transaction_context.reset(); + } + } else if (auto rejected_disp = pn_rejected_disposition(disposition); rejected_disp) { + switch (state) { + case transaction_context::State::DECLARING: + transaction_context->state = transaction_context::State::NO_TRANSACTION; + transaction_context->error = pn_rejected_disposition_condition(rejected_disp); + handler.on_session_transaction_error(session); + transaction_context->error = nullptr; + return; + case transaction_context::State::DISCHARGING: + // Note that rollback cannot fail in AMQP as the outcome would be the same, + // so don't count rejected discharge as an error (although it is a protocol error). + if (!transaction_context->failed) { + transaction_context->state = transaction_context::State::NO_TRANSACTION; + settle_outgoing_deliveries(session); + transaction_context->error = pn_rejected_disposition_condition(rejected_disp); + handler.on_session_transaction_aborted(session); + transaction_context->error = nullptr; + return; + } + case transaction_context::State::NO_TRANSACTION: + case transaction_context::State::DECLARED: + // TODO: Don't throw error here, instead detach link or close session? + make_wrapper(transaction_context->coordinator).close(error_condition{"amqp:not-allowed", "Received transaction rejected disposition in invalid state"}); + transaction_context.reset(); + } + } + // TODO: Don't ignore unexpected disposition here, instead detach link or close session? +} + +void handle_transacted_delivery_outcome(messaging_handler& handler, pn_delivery_t* d, tracker& t) { + // Don't call handlers if we're in the process of committing or aborting the transaction. + // Because we will get settlement updates at that point. + auto& session_context = session_context::get(unwrap(t.session())); + auto& transaction_context = session_context.transaction_context_; + if (transaction_context->state != transaction_context::State::DECLARED) { + return; + } + + auto disposition = pn_transactional_disposition(pn_delivery_remote(d)); + auto outcome_type = pn_transactional_disposition_get_outcome_type(disposition); + switch (outcome_type) { + case PN_ACCEPTED: + handler.on_transactional_accept(t); + break; + case PN_REJECTED: + handler.on_transactional_reject(t); + break; + case PN_RELEASED: + case PN_MODIFIED: + handler.on_transactional_release(t); + break; + default: + break; + } +} + void on_delivery(messaging_handler& handler, pn_event_t* event) { pn_link_t *lnk = pn_event_link(event); pn_delivery_t *dlv = pn_event_delivery(event); @@ -165,6 +308,12 @@ void on_delivery(messaging_handler& handler, pn_event_t* event) { // sender if (pn_delivery_updated(dlv)) { tracker t(make_wrapper<tracker>(dlv)); + // Check for outcome from a transaction coordinator + if (transaction_coordinator_sender(t.sender())) { + handle_transaction_coordinator_outcome(handler, t); + t.settle(); + return; + } ot.on_settled_span(t); switch(pn_delivery_remote_state(dlv)) { case PN_ACCEPTED: @@ -177,6 +326,9 @@ void on_delivery(messaging_handler& handler, pn_event_t* event) { case PN_MODIFIED: handler.on_tracker_release(t); break; + case PN_TRANSACTIONAL_STATE: + handle_transacted_delivery_outcome(handler, dlv, t); + break; } if (t.settled()) { handler.on_tracker_settle(t); @@ -274,13 +426,6 @@ void on_link_local_open(messaging_handler& handler, pn_event_t* event) { void on_link_remote_open(messaging_handler& handler, pn_event_t* event) { auto lnk = pn_event_link(event); - // Currently don't implement (transaction) coordinator - if (pn_terminus_get_type(pn_link_remote_target(lnk))==PN_COORDINATOR) { - auto error = pn_link_condition(lnk); - pn_condition_set_name(error, "amqp:not-implemented"); - pn_link_close(lnk); - return; - } if (pn_link_state(lnk) & PN_LOCAL_UNINIT) { // Incoming link // Copy source and target from remote end. pn_terminus_copy(pn_link_source(lnk), pn_link_remote_source(lnk)); @@ -320,10 +465,24 @@ void on_connection_wake(messaging_handler& handler, pn_event_t* event) { } -void messaging_adapter::dispatch(messaging_handler& handler, pn_event_t* event) +void messaging_adapter::dispatch(messaging_handler& h, pn_event_t* event) { pn_event_type_t type = pn_event_type(event); + // If this is an event for an (internal) transaction coordinator link set the handler to a null handler + // Unless its a delivery event which we need to process for transaction outcomes + messaging_handler& handler = [&]() -> messaging_handler& { + if (pn_link_t *lnk = pn_event_link(event); + type != PN_DELIVERY && + lnk && pn_link_is_sender(lnk) && + transaction_coordinator_sender(sender(make_wrapper<sender>(lnk)))) { + static messaging_handler null_handler; + return null_handler; + } else { + return h; + } + }(); + // Only handle events we are interested in switch(type) { case PN_CONNECTION_BOUND: on_connection_bound(handler, event); break; diff --git a/cpp/src/node_options.cpp b/cpp/src/node_options.cpp index fd489baf3..d6ec52df1 100644 --- a/cpp/src/node_options.cpp +++ b/cpp/src/node_options.cpp @@ -203,6 +203,4 @@ target_options& target_options::dynamic_properties(const target::dynamic_propert void target_options::apply(target& s) const { impl_->apply(s); } - - } // namespace proton diff --git a/cpp/src/sender.cpp b/cpp/src/sender.cpp index 8b3942bfe..9f4f4d91d 100644 --- a/cpp/src/sender.cpp +++ b/cpp/src/sender.cpp @@ -23,6 +23,7 @@ #include "proton/link.hpp" #include "proton/sender_options.hpp" +#include "proton/session.hpp" #include "proton/source.hpp" #include "proton/target.hpp" #include "proton/tracker.hpp" @@ -34,6 +35,7 @@ #include "proton_bits.hpp" #include "contexts.hpp" #include "tracing_private.hpp" +#include "types_internal.hpp" #include <assert.h> @@ -84,6 +86,13 @@ tracker sender::send(const message &message, const binary &tag) { pn_delivery_settle(dlv); if (!pn_link_credit(pn_object())) link_context::get(pn_object()).draining = false; + + // If transaction is declared + if (session().transaction_is_declared()) { + auto disp = pn_transactional_disposition(pn_delivery_local(unwrap(track))); + pn_transactional_disposition_set_id(disp, pn_bytes(session().transaction_id())); + } + return track; } diff --git a/cpp/src/session.cpp b/cpp/src/session.cpp index b8f777a00..84956e4c2 100644 --- a/cpp/src/session.cpp +++ b/cpp/src/session.cpp @@ -21,15 +21,26 @@ #include "proton/session.hpp" #include "proton/connection.hpp" +#include "proton/container.hpp" +#include "proton/delivery.hpp" +#include "proton/error.hpp" +#include "proton/messaging_handler.hpp" #include "proton/receiver_options.hpp" #include "proton/sender_options.hpp" #include "proton/session_options.hpp" +#include "proton/target_options.hpp" +#include "proton/tracker.hpp" +#include "proton/transfer.hpp" +#include "proton/types.hpp" #include "contexts.hpp" #include "link_namer.hpp" +#include "proactor_container_impl.hpp" #include "proton_bits.hpp" +#include "types_internal.hpp" #include <proton/connection.h> +#include "proton/delivery.h" #include <proton/session.h> #include <string> @@ -148,4 +159,107 @@ void* session::user_data() const { return sctx.user_data_; } +namespace { + +std::unique_ptr<transaction_context>& get_transaction_context(const session& s) { + return session_context::get(unwrap(s)).transaction_context_; +} + +bool transaction_is_empty(const session& s) { + auto& txn = get_transaction_context(s); + return !txn || txn->state == transaction_context::State::NO_TRANSACTION; +} + +proton::tracker transaction_send_ctrl(sender&& coordinator, const symbol& descriptor, const value& value) { + proton::value msg_value; + proton::codec::encoder enc(msg_value); + enc << proton::codec::start::described() + << descriptor + << value + << proton::codec::finish(); + + return coordinator.send(msg_value); +} + +void transaction_discharge(const session& s, bool failed) { + auto& transaction_context = get_transaction_context(s); + if (transaction_is_empty(s) || transaction_context->state != transaction_context::State::DECLARED) + throw proton::error("Only a declared txn can be discharged."); + transaction_context->state = transaction_context::State::DISCHARGING; + + transaction_context->failed = failed; + transaction_send_ctrl( + make_wrapper<sender>(transaction_context->coordinator), + "amqp:discharge:list", std::list<proton::value>{transaction_context->transaction_id, failed}); +} + +pn_link_t* open_coordinator_sender(session& s) { + auto l = pn_sender(unwrap(s), next_link_name(s.connection()).c_str()); + auto t = pn_link_target(l); + pn_terminus_set_type(t, PN_COORDINATOR); + auto caps = pn_terminus_capabilities(t); + // As we only have a single symbol in the capabilities we don't have to create an array + pn_data_put_symbol(caps, pn_bytes("amqp:local-transactions")); + pn_link_open(l); + return l; +} + +bool has_unsettled_outgoing_deliveries(const session& s) { + auto session = unwrap(s); + auto& transaction_context = get_transaction_context(s); + auto coordinator = transaction_context ? transaction_context->coordinator : nullptr; + auto link = pn_link_head(pn_session_connection(session), 0); + while (link) { + if (pn_link_is_sender(link) && pn_link_session(link) == session && link != coordinator) { + if (pn_link_unsettled(link) > 0) + return true; + } + link = pn_link_next(link, 0); + } + return false; +} + +} + +void session::transaction_declare() { + if (!transaction_is_empty(*this)) + throw proton::error("Session has already declared transaction"); + + // Check to see if there are any unsettled deliveries for this session + // This is to simplify keeping track of the deliveries that are involved in the transaction: + // The simplification is that we will only allow transactioned messages to be sent on this session + // so that we can find all the transactioned outgoing deliveries for the session. + // If we want to allow multiple transactions on the session or allow untransactioned messages, + // we would need to keep track of the deliveries that are involved in the transaction separately. + if (has_unsettled_outgoing_deliveries(*this)) + throw proton::error("Session has unsettled outgoing deliveries, cannot declare transaction"); + + auto& txn_context = get_transaction_context(*this); + if (!txn_context) { + txn_context = std::make_unique<transaction_context>(open_coordinator_sender(*this)); + } + + // Declare txn + txn_context->state = transaction_context::State::DECLARING; + + transaction_send_ctrl(make_wrapper<sender>(txn_context->coordinator), "amqp:declare:list", std::list<proton::value>{}); +} + + +binary session::transaction_id() const { + auto& txn_context = get_transaction_context(*this); + if (txn_context) { + return txn_context->transaction_id; + } else { + return binary(); + } +} +void session::transaction_commit() { transaction_discharge(*this, false); } +void session::transaction_abort() { transaction_discharge(*this, true); } +bool session::transaction_is_declared() const { return (!transaction_is_empty(*this)) && get_transaction_context(*this)->state == transaction_context::State::DECLARED; } +error_condition session::transaction_error() const { + auto& txn_context = get_transaction_context(*this); + return txn_context ? txn_context->error ? make_wrapper(txn_context->error) : error_condition() : error_condition(); +} + } // namespace proton --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
