This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit 63559f3c97e3ed42ee82617b6626e9843415d4e7
Author: Rakhi Kumari <[email protected]>
AuthorDate: Sat Oct 4 01:20:17 2025 -0400

    PROTON-1442: [C++] Some examples using transactions
    
    These examples are also useful for manually testing transactions against
    a broker that supports them.
---
 cpp/examples/CMakeLists.txt |   4 +-
 cpp/examples/tx_recv.cpp    | 150 ++++++++++++++++++++++++++++++++
 cpp/examples/tx_send.cpp    | 207 ++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 360 insertions(+), 1 deletion(-)

diff --git a/cpp/examples/CMakeLists.txt b/cpp/examples/CMakeLists.txt
index 8e59f0adb..685a5ed36 100644
--- a/cpp/examples/CMakeLists.txt
+++ b/cpp/examples/CMakeLists.txt
@@ -60,7 +60,9 @@ foreach(example
     scheduled_send
     service_bus
     multithreaded_client
-    multithreaded_client_flow_control)
+    multithreaded_client_flow_control
+    tx_send
+    tx_recv)
   add_executable(${example} ${example}.cpp)
   target_link_libraries(${example} Proton::cpp Threads::Threads)
 endforeach()
diff --git a/cpp/examples/tx_recv.cpp b/cpp/examples/tx_recv.cpp
new file mode 100644
index 000000000..ef6c41bcd
--- /dev/null
+++ b/cpp/examples/tx_recv.cpp
@@ -0,0 +1,150 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/connection.hpp>
+#include <proton/container.hpp>
+#include <proton/delivery.hpp>
+#include <proton/message_id.hpp>
+#include <proton/message.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/receiver_options.hpp>
+#include <proton/source.hpp>
+#include <proton/types.hpp>
+
+#include <iostream>
+#include <string>
+
+class tx_recv : public proton::messaging_handler {
+  private:
+    proton::receiver receiver;
+    std::string conn_url_;
+    std::string addr_;
+    int total;
+    int batch_size;
+    int received = 0;
+    int current_batch = 0;
+    int batch_index = 0;
+
+  public:
+    tx_recv(const std::string& u, const std::string &a, int c, int b):
+        conn_url_(u), addr_(a), total(c), batch_size(b) {}
+
+    void on_container_start(proton::container &c) override {
+        c.connect(conn_url_);
+    }
+
+   void on_connection_open(proton::connection& c) override {
+        // NOTE:credit_window(0) disables automatic flow control.
+        // We will use flow control to receive batches of messages in a 
transaction.
+        std::cout << "In this example we abort/commit transaction 
alternatively." << std::endl;
+        receiver = c.open_receiver(addr_, 
proton::receiver_options().credit_window(0));
+    }
+
+    void on_session_open(proton::session &s) override {
+        std::cout << "New session is open" << std::endl;
+        s.transaction_declare();
+    }
+
+    void on_session_error(proton::session &s) override {
+        std::cout << "Session error: " << s.error().what() << std::endl;
+        s.connection().close();
+    }
+
+    void on_session_transaction_declared(proton::session &s) override {
+        std::cout << "Transaction is declared: " << s.transaction_id() << 
std::endl;
+        receiver.add_credit(batch_size);
+    }
+
+    void on_session_transaction_committed(proton::session &s) override {
+        std::cout << "Transaction commited" << std::endl;
+        received += current_batch;
+        current_batch = 0;
+        if (received == total) {
+            std::cout << "All received messages committed, closing 
connection." << std::endl;
+            s.connection().close();
+        }
+        else {
+            std::cout << "Re-declaring transaction now... to receive next 
batch." << std::endl;
+            s.transaction_declare();
+        }
+    }
+
+    void on_session_transaction_aborted(proton::session &s) override {
+        std::cout << "Transaction aborted!" << std::endl;
+        std::cout << "Releasing all unsettled deliveries back to the 
broker..." << std::endl;
+        for (auto r: s.receivers()) {
+            for (auto d : r.unsettled_deliveries()) {
+                d.release();
+            }
+        }
+        std::cout << "Re-declaring transaction now..." << std::endl;
+        current_batch = 0;
+        s.transaction_declare();
+    }
+
+    void on_message(proton::delivery &d, proton::message &msg) override {
+        std::cout<<"# MESSAGE: " << msg.id() <<": "  << msg.body() << 
std::endl;
+        auto session = d.session();
+        d.accept();
+        current_batch += 1;
+        if (current_batch == batch_size) {
+            // Batch complete
+            if (batch_index % 2 == 1) {
+                std::cout << "Commiting transaction..." << std::endl;
+                session.transaction_commit();
+            } else {
+                std::cout << "Aborting transaction..." << std::endl;
+                session.transaction_abort();
+            }
+            batch_index++;
+        }
+    }
+};
+
+int main(int argc, char **argv) {
+    std::string conn_url = "//127.0.0.1:5672";
+    std::string addr = "examples";
+    int message_count = 6;
+    int batch_size = 3;
+    example::options opts(argc, argv);
+
+    opts.add_value(conn_url, 'u', "url", "connection URL", "URL");
+    opts.add_value(addr, 'a', "address", "address to receive messages from", 
"ADDR");
+    opts.add_value(message_count, 'm', "messages", "number of messages to 
receive", "COUNT");
+    opts.add_value(batch_size, 'b', "batch_size", "number of messages in each 
transaction", "BATCH_SIZE");
+
+    try {
+        opts.parse();
+
+        tx_recv recv(conn_url, addr, message_count, batch_size);
+        proton::container(recv).run();
+
+        return 0;
+    } catch (const example::bad_option& e) {
+        std::cout << opts << std::endl << e.what() << std::endl;
+    } catch (const std::exception& e) {
+        std::cerr << e.what() << std::endl;
+    }
+
+    return 1;
+}
diff --git a/cpp/examples/tx_send.cpp b/cpp/examples/tx_send.cpp
new file mode 100644
index 000000000..518b76479
--- /dev/null
+++ b/cpp/examples/tx_send.cpp
@@ -0,0 +1,207 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/connection.hpp>
+#include <proton/container.hpp>
+#include <proton/message.hpp>
+#include <proton/message_id.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/sender_options.hpp>
+#include <proton/target_options.hpp>
+#include <proton/types.hpp>
+
+#include <iostream>
+#include <map>
+#include <string>
+
+class tx_send : public proton::messaging_handler {
+  private:
+    std::string conn_url_;
+    std::string addr_;
+    proton::sender sender;
+    int total;
+    int batch_size;
+    int abort_message;
+    int accepted = 0;
+    int total_accepted = 0;
+    int batch_index = 0;
+    int current_batch = 0;
+    int committed = 0;
+    int msg_id = 10000;
+    int err_id;
+
+  public:
+    tx_send(const std::string& u, const std::string& a, int t, int b, int c, 
int e):
+        conn_url_(u), addr_(a), total(t), batch_size(b), abort_message(c), 
err_id(e ? msg_id+e : 0) {}
+
+    void on_container_start(proton::container &c) override {
+        c.connect(conn_url_);
+    }
+
+    void on_connection_open(proton::connection& c) override {
+        c.open_session();
+    }
+
+    void on_session_open(proton::session& s) override {
+        std::cout << "New session is open, declaring transaction now..." << 
std::endl;
+        s.open_sender("", 
proton::sender_options{}.target(proton::target_options{}.anonymous(true)));
+    }
+
+    void on_sender_open(proton::sender& s) override {
+        sender = s;
+        s.session().transaction_declare();
+    }
+
+    void on_session_transaction_declared(proton::session& s) override {
+        std::cout << "Transaction is declared: " << s.transaction_id() << 
std::endl;
+        send(sender);
+    }
+
+    void on_session_error(proton::session &s) override {
+        std::cout << "Session error: " << s.error().what() << std::endl;
+        s.connection().close();
+    }
+
+    void on_session_transaction_error(proton::session &s) override {
+        std::cout << "Transaction error!" << s.transaction_error().what() << 
std::endl;
+        s.connection().close();
+    }
+
+    void on_sendable(proton::sender& sender) override {
+        proton::session session = sender.session();
+        send(sender);
+    }
+
+    void send(proton::sender& sender) {
+        while (sender.session().transaction_is_declared() && sender.credit() &&
+               current_batch < batch_size) {
+            proton::message msg;
+
+            msg.id(msg_id++);
+            if (msg_id != err_id) {
+                msg.to(addr_);
+            }
+            msg.body(std::map<std::string, int>{{"batch", batch_index}, 
{"index", current_batch}});
+            std::cout << "Sending [" << batch_index << ", " << current_batch 
<< "]: " << msg << std::endl;
+            sender.send(msg);
+            current_batch += 1;
+        }
+    }
+
+    void on_transactional_accept(proton::tracker &t) override {
+        accepted++;
+        total_accepted++;
+        if (total_accepted == abort_message) {
+            t.session().transaction_abort();
+        } else if (accepted == batch_size) {
+            t.session().transaction_commit();
+        }
+    }
+
+    void on_transactional_reject(proton::tracker &t) override {
+        std::cout << "Delivery rejected!" << std::endl;
+        t.session().transaction_abort();
+    }
+
+    void on_transactional_release(proton::tracker &t) override {
+        std::cout << "Delivery released!" << std::endl;
+        t.session().transaction_abort();
+    }
+
+    void on_session_transaction_committed(proton::session &s) override {
+        committed += accepted;
+        batch_index++;
+        std::cout << "Transaction committed" << std::endl;
+        if (committed >= total) {
+            std::cout << committed << " messages committed, closing 
connection." << std::endl;
+            s.connection().close();
+        } else {
+            current_batch = 0;
+            accepted = 0;
+            std::cout << "Re-declaring transaction now..." << std::endl;
+            s.transaction_declare();
+        }
+    }
+
+    // The committed messages will be settled by the broker, we should settle 
them too.
+    void on_tracker_settle(proton::tracker &t) override {
+        std::cout << "Broker settled tracker: " << t.tag() << std::endl;
+    }
+
+    void on_session_transaction_aborted(proton::session &s) override {
+        std::cout << "Transaction aborted!" << std::endl;
+        // Check if this was a failed commit
+        auto error = s.transaction_error();
+        if (error) {
+            std::cout << "Transaction error: " << error.what() << std::endl;
+            s.connection().close();
+        }
+
+        // Don't close the connection if we deliberately injected an abort
+        if (abort_message == 0) {
+            s.connection().close();
+        } else {
+            current_batch = 0;
+            accepted = 0;
+            std::cout << "Re-declaring transaction now..." << std::endl;
+            s.transaction_declare();
+        }
+    }
+
+    void on_sender_close(proton::sender &s) override {
+        current_batch = 0;
+    }
+
+};
+
+int main(int argc, char **argv) {
+    std::string conn_url = "//127.0.0.1:5672";
+    std::string addr = "examples";
+    int message_count = 6;
+    int batch_size = 3;
+    int abort_message = 0;
+    int error_message = 0;
+    example::options opts(argc, argv);
+
+    opts.add_value(conn_url, 'u', "url", "connect and send to URL", "URL");
+    opts.add_value(addr, 'a', "address", "connect and send to address", 
"ADDR");
+    opts.add_value(message_count, 'm', "messages", "number of messages to 
send", "COUNT");
+    opts.add_value(batch_size, 'b', "batch_size", "number of messages in each 
transaction", "BATCH_SIZE");
+    opts.add_value(abort_message, 'c', "abort_message", "message number to 
abort the transaction", "ABORT_MESSAGE");
+    opts.add_value(error_message, 'e', "error_message", "message number to 
send in error", "ERROR_MESSAGE");
+
+    try {
+        opts.parse();
+
+        tx_send send(conn_url, addr, message_count, batch_size, abort_message, 
error_message);
+        proton::container(send).run();
+
+        return 0;
+    } catch (const example::bad_option& e) {
+        std::cout << opts << std::endl << e.what() << std::endl;
+    } catch (const std::exception& e) {
+        std::cerr << e.what() << std::endl;
+    }
+
+    return 1;
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to