This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit b6673de70e5d0d0d9d09a35c1dbf657f675bafbb
Author: Rakhi Kumari <[email protected]>
AuthorDate: Sat Oct 4 01:20:17 2025 -0400

    PROTON-1442: [C++] Implement local transactions
    
    Implement handling for declaring and discharging transactions on AMQP
    sessions.
    
    Added new callbacks which allow the application to respond to
    transaction events: declaring, committing and aborting transactions;
    provisionally accepting, rejecting & releasing deliveries in a
    transaction.
    
    This code was written with the assistance of Cursor.
---
 cpp/include/proton/messaging_handler.hpp |  31 +++++
 cpp/include/proton/session.hpp           |  27 ++++-
 cpp/src/contexts.cpp                     |   6 +
 cpp/src/contexts.hpp                     |  23 +++-
 cpp/src/delivery.cpp                     |  11 ++
 cpp/src/handler.cpp                      |  10 ++
 cpp/src/messaging_adapter.cpp            | 187 ++++++++++++++++++++++++++++---
 cpp/src/node_options.cpp                 |   2 -
 cpp/src/sender.cpp                       |   9 ++
 cpp/src/session.cpp                      | 114 +++++++++++++++++++
 10 files changed, 399 insertions(+), 21 deletions(-)

diff --git a/cpp/include/proton/messaging_handler.hpp 
b/cpp/include/proton/messaging_handler.hpp
index 213dbe73e..0754b0c77 100644
--- a/cpp/include/proton/messaging_handler.hpp
+++ b/cpp/include/proton/messaging_handler.hpp
@@ -172,6 +172,37 @@ PN_CPP_CLASS_EXTERN messaging_handler {
     /// The remote peer closed the session with an error condition.
     PN_CPP_EXTERN virtual void on_session_error(session&);
 
+    /// **Unsettled API** - Called when a local transaction is declared.
+    PN_CPP_EXTERN virtual void on_session_transaction_declared(session&);
+
+    /// **Unsettled API** - Called when a local transaction is discharged 
successfully.
+    PN_CPP_EXTERN virtual void on_session_transaction_committed(session&);
+
+    /// **Unsettled API** - Called when a local transaction is discharged 
unsuccessfully (aborted).
+    /// This is either due to an explicit abort or a failure during commit.
+    /// In either case any action taken under the transaction is as if it never
+    /// happened.
+    PN_CPP_EXTERN virtual void on_session_transaction_aborted(session&);
+
+    /// **Unsettled API** - Called when a local transaction operation fails.
+    PN_CPP_EXTERN virtual void on_session_transaction_error(session&);
+
+    /// **Unsettled API** - Called when a transactioned delivery is 
provisionally accepted.
+    /// This means that if the transaction successfully commits the delivery
+    /// will be accepted. If the transaction aborts the delivery will be
+    /// as if it never happened.
+    PN_CPP_EXTERN virtual void on_transactional_accept(tracker&);
+
+    /// **Unsettled API** - Called when a transactioned delivery is 
provisionally rejected.
+    /// This means that if the transaction successfully commits the delivery
+    /// will be rejected.
+    PN_CPP_EXTERN virtual void on_transactional_reject(tracker&);
+
+    /// **Unsettled API** - Called when a transactioned delivery is 
provisionally released.
+    /// This means that if the transaction successfully commits the delivery
+    /// will be released (including modifying the delivery count).
+    PN_CPP_EXTERN virtual void on_transactional_release(tracker&);
+
     /// The remote peer opened the link.
     PN_CPP_EXTERN virtual void on_receiver_open(receiver&);
 
diff --git a/cpp/include/proton/session.hpp b/cpp/include/proton/session.hpp
index ea7d3bbc1..24438caa9 100644
--- a/cpp/include/proton/session.hpp
+++ b/cpp/include/proton/session.hpp
@@ -105,14 +105,33 @@ PN_CPP_CLASS_EXTERN session : public 
internal::object<pn_session_t>, public endp
     /// Get user data from this session.
     PN_CPP_EXTERN void* user_data() const;
 
+    /// **Unsettled API** - Declare a new local transaction on this session.
+    PN_CPP_EXTERN void transaction_declare();
+
+    /// **Unsettled API** - Commit the currently declared transaction.
+    PN_CPP_EXTERN void transaction_commit();
+
+    /// **Unsettled API** - Abort the currently declared transaction.
+    PN_CPP_EXTERN void transaction_abort();
+
+    /// **Unsettled API** - Return true if a transaction is currently declared.
+    PN_CPP_EXTERN bool transaction_is_declared() const;
+
+    /// **Unsettled API** - Return the identifier of the current transaction.
+    PN_CPP_EXTERN binary transaction_id() const;
+
+    /// **Unsettled API** - Return the error condition associated with 
transaction.
+    PN_CPP_EXTERN error_condition transaction_error() const;
+
     /// @cond INTERNAL
-  friend class internal::factory<session>;
-  friend class session_iterator;
+    friend class internal::factory<session>;
+    friend class sender;
+    friend class session_iterator;
     /// @endcond
 };
 
 /// @cond INTERNAL
-    
+
 /// An iterator of sessions.
 class session_iterator : public internal::iter_base<session, session_iterator> 
{
   public:
@@ -126,7 +145,7 @@ class session_iterator : public 
internal::iter_base<session, session_iterator> {
 typedef internal::iter_range<session_iterator> session_range;
 
 /// @endcond
-    
+
 } // proton
 
 #endif // PROTON_SESSION_HPP
diff --git a/cpp/src/contexts.cpp b/cpp/src/contexts.cpp
index a3b393968..6c2838da7 100644
--- a/cpp/src/contexts.cpp
+++ b/cpp/src/contexts.cpp
@@ -87,6 +87,12 @@ link_context& link_context::get(pn_link_t* l) {
     return ref<link_context>(id(pn_link_attachments(l), LINK_CONTEXT));
 }
 
+transaction_context::transaction_context(pn_link_t* coordinator_sender) :
+  coordinator(coordinator_sender)
+{}
+
+session_context::session_context() : handler(nullptr), user_data_(nullptr) {}
+
 session_context& session_context::get(pn_session_t* s) {
     return ref<session_context>(id(pn_session_attachments(s), 
SESSION_CONTEXT));
 }
diff --git a/cpp/src/contexts.hpp b/cpp/src/contexts.hpp
index 7ebab1d7b..e85ff06f5 100644
--- a/cpp/src/contexts.hpp
+++ b/cpp/src/contexts.hpp
@@ -28,6 +28,7 @@
 #include "proton/message.hpp"
 
 #include "proton/object.h"
+#include "proton/condition.h"
 
 #include <memory>
 
@@ -147,15 +148,35 @@ class link_context : public context {
     void* user_data_;
 };
 
+class transaction_context;
+
 class session_context : public context {
   public:
-    session_context() : handler(0), user_data_(nullptr) {}
+    session_context();
     static session_context& get(pn_session_t* s);
 
     messaging_handler* handler;
+    std::unique_ptr<transaction_context> transaction_context_;
     void* user_data_;
 };
 
+// This is not a context object on its own, but an optional part of session
+class transaction_context {
+  public:
+    transaction_context(pn_link_t* coordinator);
+    pn_link_t* coordinator;
+    pn_condition_t* error = nullptr;
+    binary transaction_id;
+    bool failed = false;
+    enum class State {
+      NO_TRANSACTION,
+      DECLARING,
+      DECLARED,
+      DISCHARGING,
+    };
+    State state = State::NO_TRANSACTION;
+};
+
 class transfer_context : public context {
   public:
     transfer_context() : user_data_(nullptr) {}
diff --git a/cpp/src/delivery.cpp b/cpp/src/delivery.cpp
index 654f50a36..102eca671 100644
--- a/cpp/src/delivery.cpp
+++ b/cpp/src/delivery.cpp
@@ -33,9 +33,20 @@
 
 #include "proton/binary.hpp"
 
+#include <proton/session.hpp>
+
 namespace {
 
 void settle_delivery(pn_delivery_t* o, uint64_t state) {
+    proton::session session = proton::make_wrapper(o).session();
+    if(session.transaction_is_declared()) {
+        // Transactional disposition
+        auto disp = pn_transactional_disposition(pn_delivery_local(o));
+        pn_transactional_disposition_set_id(disp, 
pn_bytes(session.transaction_id()));
+        pn_transactional_disposition_set_outcome_type(disp, state);
+        pn_delivery_update(o, PN_TRANSACTIONAL_STATE);
+        return;
+    }
     pn_delivery_update(o, state);
     pn_delivery_settle(o);
 }
diff --git a/cpp/src/handler.cpp b/cpp/src/handler.cpp
index 1632efda6..152c519b2 100644
--- a/cpp/src/handler.cpp
+++ b/cpp/src/handler.cpp
@@ -65,6 +65,16 @@ void messaging_handler::on_session_open(session &s) {
         pn_session_open(unwrap(s));
     }
 }
+
+void messaging_handler::on_session_transaction_declared(session &) {}
+void messaging_handler::on_session_transaction_committed(session &) {}
+void messaging_handler::on_session_transaction_aborted(session &) {}
+void messaging_handler::on_session_transaction_error(session &s) { 
on_session_error(s); }
+
+void messaging_handler::on_transactional_accept(tracker &) {}
+void messaging_handler::on_transactional_reject(tracker &) {}
+void messaging_handler::on_transactional_release(tracker &) {}
+
 void messaging_handler::on_receiver_close(receiver &) {}
 void messaging_handler::on_receiver_error(receiver &l) { on_error(l.error()); }
 void messaging_handler::on_receiver_open(receiver &l) {
diff --git a/cpp/src/messaging_adapter.cpp b/cpp/src/messaging_adapter.cpp
index f90cd7613..27ce28a58 100644
--- a/cpp/src/messaging_adapter.cpp
+++ b/cpp/src/messaging_adapter.cpp
@@ -22,14 +22,11 @@
 #include "messaging_adapter.hpp"
 
 #include "proton/connection.hpp"
-#include "proton/container.hpp"
 #include "proton/delivery.hpp"
 #include "proton/error.hpp"
 #include "proton/messaging_handler.hpp"
 #include "proton/receiver.hpp"
-#include "proton/receiver_options.hpp"
 #include "proton/sender.hpp"
-#include "proton/sender_options.hpp"
 #include "proton/session.hpp"
 #include "proton/tracker.hpp"
 #include "proton/transport.hpp"
@@ -37,6 +34,8 @@
 #include "contexts.hpp"
 #include "msg.hpp"
 #include "proton_bits.hpp"
+#include "tracing_private.hpp"
+#include "types_internal.hpp"
 
 #include <proton/connection.h>
 #include <proton/delivery.h>
@@ -45,8 +44,7 @@
 #include <proton/message.h>
 #include <proton/session.h>
 #include <proton/transport.h>
-
-#include "tracing_private.hpp"
+#include <proton/types.h>
 
 #include <assert.h>
 
@@ -69,7 +67,7 @@ void on_link_flow(messaging_handler& handler, pn_event_t* 
event) {
     // TODO: process session flow data, if no link-specific data, just return.
     if (!lnk) return;
     int state = pn_link_state(lnk);
-    if ((state&PN_LOCAL_ACTIVE) && (state&PN_REMOTE_ACTIVE)) {
+    if (((state & PN_LOCAL_ACTIVE) && (state & PN_REMOTE_ACTIVE))) {
         link_context& lctx = link_context::get(lnk);
         if (pn_link_is_sender(lnk)) {
             if (pn_link_credit(lnk) > 0) {
@@ -110,6 +108,151 @@ void message_decode(message& msg, proton::delivery 
delivery) {
     pn_link_advance(unwrap(link));
 }
 
+bool transaction_coordinator_sender(const sender& s) {
+    auto& txn_context = 
session_context::get(unwrap(s.session())).transaction_context_;
+    return txn_context && (txn_context->coordinator == unwrap(s));
+}
+
+void settle_outgoing_deliveries(session& s) {
+    // If the transaction is aborted, then for any messages that we sent that 
are still unsettled, we need to settle them
+    // There is no need for a disposition update beyond that as the broker 
will have already done the abort.
+    // We do need to know the deliveries that are involved in the transaction 
so we can settle them - since we
+    // scope our transactions to a session, we will settle all outgoing 
deliveries for the session.
+    auto session = unwrap(s);
+    auto& transaction_context = 
session_context::get(session).transaction_context_;
+    auto coordinator = transaction_context->coordinator;
+    for (auto link = pn_link_head(pn_session_connection(session), 0);
+         link;
+         link = pn_link_next(link, 0)) {
+        if (pn_link_is_sender(link) && pn_link_session(link) == session && 
link != coordinator) {
+            for (auto delivery = pn_unsettled_head(link);
+                 delivery;
+                 delivery = pn_unsettled_next(delivery)) {
+                pn_delivery_settle(delivery);
+            }
+        }
+    }
+}
+
+void settle_incoming_deliveries(session& s) {
+    // When the transaction commits, settle all incoming deliveries (on 
receiver links) that we
+    // had sent a provisional disposition for. They are now definitively 
accepted/rejected/released.
+    auto session = unwrap(s);
+    for (auto link = pn_link_head(pn_session_connection(session), 0);
+         link;
+         link = pn_link_next(link, 0)) {
+        if (pn_link_is_receiver(link) && pn_link_session(link) == session) {
+            for (auto delivery = pn_unsettled_head(link);
+                 delivery;
+                 delivery = pn_unsettled_next(delivery)) {
+                pn_delivery_settle(delivery);
+            }
+        }
+    }
+}
+
+void handle_transaction_coordinator_outcome(messaging_handler& handler, const 
tracker& t) {
+    auto session = t.session();
+    auto& session_context = session_context::get(unwrap(session));
+    auto& transaction_context = session_context.transaction_context_;
+    auto state = transaction_context->state;
+    auto disposition = pn_delivery_remote(unwrap(t));
+    if (auto *declared_disp = pn_declared_disposition(disposition); 
declared_disp) {
+        switch (state) {
+          case transaction_context::State::DECLARING: {
+            pn_bytes_t txn_id = pn_declared_disposition_get_id(declared_disp);
+            transaction_context->transaction_id = proton::bin(txn_id);
+            transaction_context->state = transaction_context::State::DECLARED;
+            handler.on_session_transaction_declared(session);
+            return;
+          }
+          case transaction_context::State::NO_TRANSACTION:
+          case transaction_context::State::DECLARED:
+          case transaction_context::State::DISCHARGING:
+            // Don't throw error here, instead close link with error
+            
make_wrapper(transaction_context->coordinator).close(error_condition{"amqp:not-allowed",
 "Received transaction declared disposition in invalid state"});
+            transaction_context.reset();
+        }
+    } else if (pn_disposition_type(disposition) == PN_ACCEPTED) {
+        switch (state) {
+          case transaction_context::State::DISCHARGING: {
+            if (transaction_context->failed) {
+                // Transaction abort is successful
+                transaction_context->state = 
transaction_context::State::NO_TRANSACTION;
+                settle_outgoing_deliveries(session);
+                handler.on_session_transaction_aborted(session);
+                return;
+            } else {
+                // Transaction commit is successful
+                transaction_context->state = 
transaction_context::State::NO_TRANSACTION;
+                settle_incoming_deliveries(session);
+                handler.on_session_transaction_committed(session);
+                return;
+            }
+          }
+          case transaction_context::State::NO_TRANSACTION:
+          case transaction_context::State::DECLARING:
+          case transaction_context::State::DECLARED:
+            // TODO: Don't throw error here, instead detach link or close 
session?
+            
make_wrapper(transaction_context->coordinator).close(error_condition{"amqp:not-allowed",
 "Received transaction accepted disposition in invalid state"});
+            transaction_context.reset();
+        }
+    } else if (auto rejected_disp = pn_rejected_disposition(disposition); 
rejected_disp) {
+        switch (state) {
+          case transaction_context::State::DECLARING:
+            transaction_context->state = 
transaction_context::State::NO_TRANSACTION;
+            transaction_context->error = 
pn_rejected_disposition_condition(rejected_disp);
+            handler.on_session_transaction_error(session);
+            transaction_context->error = nullptr;
+            return;
+          case transaction_context::State::DISCHARGING:
+            // Note that rollback cannot fail in AMQP as the outcome would be 
the same,
+            // so don't count rejected discharge as an error (although it is a 
protocol error).
+            if (!transaction_context->failed) {
+                transaction_context->state = 
transaction_context::State::NO_TRANSACTION;
+                settle_outgoing_deliveries(session);
+                transaction_context->error = 
pn_rejected_disposition_condition(rejected_disp);
+                handler.on_session_transaction_aborted(session);
+                transaction_context->error = nullptr;
+                return;
+            }
+          case transaction_context::State::NO_TRANSACTION:
+          case transaction_context::State::DECLARED:
+            // TODO: Don't throw error here, instead detach link or close 
session?
+            
make_wrapper(transaction_context->coordinator).close(error_condition{"amqp:not-allowed",
 "Received transaction rejected disposition in invalid state"});
+            transaction_context.reset();
+        }
+    }
+    // TODO: Don't ignore unexpected disposition here, instead detach link or 
close session?
+}
+
+void handle_transacted_delivery_outcome(messaging_handler& handler, 
pn_delivery_t* d, tracker& t) {
+    // Don't call handlers if we're in the process of committing or aborting 
the transaction.
+    // Because we will get settlement updates at that point.
+    auto& session_context = session_context::get(unwrap(t.session()));
+    auto& transaction_context = session_context.transaction_context_;
+    if (transaction_context->state != transaction_context::State::DECLARED) {
+        return;
+    }
+
+    auto disposition = pn_transactional_disposition(pn_delivery_remote(d));
+    auto outcome_type = 
pn_transactional_disposition_get_outcome_type(disposition);
+    switch (outcome_type) {
+        case PN_ACCEPTED:
+            handler.on_transactional_accept(t);
+            break;
+        case PN_REJECTED:
+            handler.on_transactional_reject(t);
+            break;
+        case PN_RELEASED:
+        case PN_MODIFIED:
+            handler.on_transactional_release(t);
+            break;
+        default:
+            break;
+    }
+}
+
 void on_delivery(messaging_handler& handler, pn_event_t* event) {
     pn_link_t *lnk = pn_event_link(event);
     pn_delivery_t *dlv = pn_event_delivery(event);
@@ -165,6 +308,12 @@ void on_delivery(messaging_handler& handler, pn_event_t* 
event) {
         // sender
         if (pn_delivery_updated(dlv)) {
             tracker t(make_wrapper<tracker>(dlv));
+            // Check for outcome from a transaction coordinator
+            if (transaction_coordinator_sender(t.sender())) {
+                handle_transaction_coordinator_outcome(handler, t);
+                t.settle();
+                return;
+            }
             ot.on_settled_span(t);
             switch(pn_delivery_remote_state(dlv)) {
             case PN_ACCEPTED:
@@ -177,6 +326,9 @@ void on_delivery(messaging_handler& handler, pn_event_t* 
event) {
             case PN_MODIFIED:
                 handler.on_tracker_release(t);
                 break;
+            case PN_TRANSACTIONAL_STATE:
+                handle_transacted_delivery_outcome(handler, dlv, t);
+                break;
             }
             if (t.settled()) {
                 handler.on_tracker_settle(t);
@@ -274,13 +426,6 @@ void on_link_local_open(messaging_handler& handler, 
pn_event_t* event) {
 
 void on_link_remote_open(messaging_handler& handler, pn_event_t* event) {
     auto lnk = pn_event_link(event);
-    // Currently don't implement (transaction) coordinator
-    if (pn_terminus_get_type(pn_link_remote_target(lnk))==PN_COORDINATOR) {
-      auto error = pn_link_condition(lnk);
-      pn_condition_set_name(error, "amqp:not-implemented");
-      pn_link_close(lnk);
-      return;
-    }
     if (pn_link_state(lnk) & PN_LOCAL_UNINIT) { // Incoming link
         // Copy source and target from remote end.
         pn_terminus_copy(pn_link_source(lnk), pn_link_remote_source(lnk));
@@ -320,10 +465,24 @@ void on_connection_wake(messaging_handler& handler, 
pn_event_t* event) {
 
 }
 
-void messaging_adapter::dispatch(messaging_handler& handler, pn_event_t* event)
+void messaging_adapter::dispatch(messaging_handler& h, pn_event_t* event)
 {
     pn_event_type_t type = pn_event_type(event);
 
+    // If this is an event for an (internal) transaction coordinator link set 
the handler to a null handler
+    // Unless its a delivery event which we need to process for transaction 
outcomes
+    messaging_handler& handler = [&]() -> messaging_handler& {
+        if (pn_link_t *lnk = pn_event_link(event);
+            type != PN_DELIVERY &&
+            lnk && pn_link_is_sender(lnk) &&
+            transaction_coordinator_sender(sender(make_wrapper<sender>(lnk)))) 
{
+            static messaging_handler null_handler;
+            return null_handler;
+        } else {
+            return h;
+        }
+    }();
+
     // Only handle events we are interested in
     switch(type) {
       case PN_CONNECTION_BOUND: on_connection_bound(handler, event); break;
diff --git a/cpp/src/node_options.cpp b/cpp/src/node_options.cpp
index fd489baf3..d6ec52df1 100644
--- a/cpp/src/node_options.cpp
+++ b/cpp/src/node_options.cpp
@@ -203,6 +203,4 @@ target_options& target_options::dynamic_properties(const 
target::dynamic_propert
 
 void target_options::apply(target& s) const { impl_->apply(s); }
 
-
-
 } // namespace proton
diff --git a/cpp/src/sender.cpp b/cpp/src/sender.cpp
index 8b3942bfe..9f4f4d91d 100644
--- a/cpp/src/sender.cpp
+++ b/cpp/src/sender.cpp
@@ -23,6 +23,7 @@
 
 #include "proton/link.hpp"
 #include "proton/sender_options.hpp"
+#include "proton/session.hpp"
 #include "proton/source.hpp"
 #include "proton/target.hpp"
 #include "proton/tracker.hpp"
@@ -34,6 +35,7 @@
 #include "proton_bits.hpp"
 #include "contexts.hpp"
 #include "tracing_private.hpp"
+#include "types_internal.hpp"
 
 #include <assert.h>
 
@@ -84,6 +86,13 @@ tracker sender::send(const message &message, const binary 
&tag) {
         pn_delivery_settle(dlv);
     if (!pn_link_credit(pn_object()))
         link_context::get(pn_object()).draining = false;
+
+    // If transaction is declared
+    if (session().transaction_is_declared()) {
+        auto disp = 
pn_transactional_disposition(pn_delivery_local(unwrap(track)));
+        pn_transactional_disposition_set_id(disp, 
pn_bytes(session().transaction_id()));
+    }
+
     return track;
 }
 
diff --git a/cpp/src/session.cpp b/cpp/src/session.cpp
index b8f777a00..84956e4c2 100644
--- a/cpp/src/session.cpp
+++ b/cpp/src/session.cpp
@@ -21,15 +21,26 @@
 #include "proton/session.hpp"
 
 #include "proton/connection.hpp"
+#include "proton/container.hpp"
+#include "proton/delivery.hpp"
+#include "proton/error.hpp"
+#include "proton/messaging_handler.hpp"
 #include "proton/receiver_options.hpp"
 #include "proton/sender_options.hpp"
 #include "proton/session_options.hpp"
+#include "proton/target_options.hpp"
+#include "proton/tracker.hpp"
+#include "proton/transfer.hpp"
+#include "proton/types.hpp"
 
 #include "contexts.hpp"
 #include "link_namer.hpp"
+#include "proactor_container_impl.hpp"
 #include "proton_bits.hpp"
+#include "types_internal.hpp"
 
 #include <proton/connection.h>
+#include "proton/delivery.h"
 #include <proton/session.h>
 
 #include <string>
@@ -148,4 +159,107 @@ void* session::user_data() const {
     return sctx.user_data_;
 }
 
+namespace {
+
+std::unique_ptr<transaction_context>& get_transaction_context(const session& 
s) {
+    return session_context::get(unwrap(s)).transaction_context_;
+}
+
+bool transaction_is_empty(const session& s) {
+    auto& txn = get_transaction_context(s);
+    return !txn || txn->state == transaction_context::State::NO_TRANSACTION;
+}
+
+proton::tracker transaction_send_ctrl(sender&& coordinator, const symbol& 
descriptor, const value& value) {
+    proton::value msg_value;
+    proton::codec::encoder enc(msg_value);
+    enc << proton::codec::start::described()
+        << descriptor
+        << value
+        << proton::codec::finish();
+
+    return coordinator.send(msg_value);
+}
+
+void transaction_discharge(const session& s, bool failed) {
+    auto& transaction_context = get_transaction_context(s);
+    if (transaction_is_empty(s) || transaction_context->state != 
transaction_context::State::DECLARED)
+        throw proton::error("Only a declared txn can be discharged.");
+    transaction_context->state = transaction_context::State::DISCHARGING;
+
+    transaction_context->failed = failed;
+    transaction_send_ctrl(
+        make_wrapper<sender>(transaction_context->coordinator),
+        "amqp:discharge:list", 
std::list<proton::value>{transaction_context->transaction_id, failed});
+}
+
+pn_link_t* open_coordinator_sender(session& s) {
+    auto l = pn_sender(unwrap(s), next_link_name(s.connection()).c_str());
+    auto t = pn_link_target(l);
+    pn_terminus_set_type(t, PN_COORDINATOR);
+    auto caps = pn_terminus_capabilities(t);
+    // As we only have a single symbol in the capabilities we don't have to 
create an array
+    pn_data_put_symbol(caps, pn_bytes("amqp:local-transactions"));
+    pn_link_open(l);
+    return l;
+}
+
+bool has_unsettled_outgoing_deliveries(const session& s) {
+    auto session = unwrap(s);
+    auto& transaction_context = get_transaction_context(s);
+    auto coordinator = transaction_context ? transaction_context->coordinator 
: nullptr;
+    auto link = pn_link_head(pn_session_connection(session), 0);
+    while (link) {
+        if (pn_link_is_sender(link) && pn_link_session(link) == session && 
link != coordinator) {
+            if (pn_link_unsettled(link) > 0)
+                return true;
+        }
+        link = pn_link_next(link, 0);
+    }
+    return false;
+}
+
+}
+
+void session::transaction_declare() {
+    if (!transaction_is_empty(*this))
+        throw proton::error("Session has already declared transaction");
+
+    // Check to see if there are any unsettled deliveries for this session
+    // This is to simplify keeping track of the deliveries that are involved 
in the transaction:
+    // The simplification is that we will only allow transactioned messages to 
be sent on this session
+    // so that we can find all the transactioned outgoing deliveries for the 
session.
+    // If we want to allow multiple transactions on the session or allow 
untransactioned messages,
+    // we would need to keep track of the deliveries that are involved in the 
transaction separately.
+    if (has_unsettled_outgoing_deliveries(*this))
+        throw proton::error("Session has unsettled outgoing deliveries, cannot 
declare transaction");
+
+    auto& txn_context = get_transaction_context(*this);
+    if (!txn_context) {
+        txn_context = 
std::make_unique<transaction_context>(open_coordinator_sender(*this));
+    }
+
+    // Declare txn
+    txn_context->state = transaction_context::State::DECLARING;
+
+    transaction_send_ctrl(make_wrapper<sender>(txn_context->coordinator), 
"amqp:declare:list", std::list<proton::value>{});
+}
+
+
+binary session::transaction_id() const { 
+    auto& txn_context = get_transaction_context(*this);
+    if (txn_context) {
+        return txn_context->transaction_id;
+    } else {
+        return binary();
+    }
+}
+void session::transaction_commit() { transaction_discharge(*this, false); }
+void session::transaction_abort() { transaction_discharge(*this, true); }
+bool session::transaction_is_declared() const { return 
(!transaction_is_empty(*this)) && get_transaction_context(*this)->state == 
transaction_context::State::DECLARED; }
+error_condition session::transaction_error() const {
+    auto& txn_context = get_transaction_context(*this);
+    return txn_context ? txn_context->error ? make_wrapper(txn_context->error) 
: error_condition() : error_condition();
+}
+
 } // namespace proton


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to