This is an automated email from the ASF dual-hosted git repository. astitcher pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit 63559f3c97e3ed42ee82617b6626e9843415d4e7 Author: Rakhi Kumari <[email protected]> AuthorDate: Sat Oct 4 01:20:17 2025 -0400 PROTON-1442: [C++] Some examples using transactions These examples are also useful for manually testing transactions against a broker that supports them. --- cpp/examples/CMakeLists.txt | 4 +- cpp/examples/tx_recv.cpp | 150 ++++++++++++++++++++++++++++++++ cpp/examples/tx_send.cpp | 207 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 360 insertions(+), 1 deletion(-) diff --git a/cpp/examples/CMakeLists.txt b/cpp/examples/CMakeLists.txt index 8e59f0adb..685a5ed36 100644 --- a/cpp/examples/CMakeLists.txt +++ b/cpp/examples/CMakeLists.txt @@ -60,7 +60,9 @@ foreach(example scheduled_send service_bus multithreaded_client - multithreaded_client_flow_control) + multithreaded_client_flow_control + tx_send + tx_recv) add_executable(${example} ${example}.cpp) target_link_libraries(${example} Proton::cpp Threads::Threads) endforeach() diff --git a/cpp/examples/tx_recv.cpp b/cpp/examples/tx_recv.cpp new file mode 100644 index 000000000..ef6c41bcd --- /dev/null +++ b/cpp/examples/tx_recv.cpp @@ -0,0 +1,150 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "options.hpp" + +#include <proton/connection.hpp> +#include <proton/container.hpp> +#include <proton/delivery.hpp> +#include <proton/message_id.hpp> +#include <proton/message.hpp> +#include <proton/messaging_handler.hpp> +#include <proton/receiver_options.hpp> +#include <proton/source.hpp> +#include <proton/types.hpp> + +#include <iostream> +#include <string> + +class tx_recv : public proton::messaging_handler { + private: + proton::receiver receiver; + std::string conn_url_; + std::string addr_; + int total; + int batch_size; + int received = 0; + int current_batch = 0; + int batch_index = 0; + + public: + tx_recv(const std::string& u, const std::string &a, int c, int b): + conn_url_(u), addr_(a), total(c), batch_size(b) {} + + void on_container_start(proton::container &c) override { + c.connect(conn_url_); + } + + void on_connection_open(proton::connection& c) override { + // NOTE:credit_window(0) disables automatic flow control. + // We will use flow control to receive batches of messages in a transaction. + std::cout << "In this example we abort/commit transaction alternatively." << std::endl; + receiver = c.open_receiver(addr_, proton::receiver_options().credit_window(0)); + } + + void on_session_open(proton::session &s) override { + std::cout << "New session is open" << std::endl; + s.transaction_declare(); + } + + void on_session_error(proton::session &s) override { + std::cout << "Session error: " << s.error().what() << std::endl; + s.connection().close(); + } + + void on_session_transaction_declared(proton::session &s) override { + std::cout << "Transaction is declared: " << s.transaction_id() << std::endl; + receiver.add_credit(batch_size); + } + + void on_session_transaction_committed(proton::session &s) override { + std::cout << "Transaction commited" << std::endl; + received += current_batch; + current_batch = 0; + if (received == total) { + std::cout << "All received messages committed, closing connection." << std::endl; + s.connection().close(); + } + else { + std::cout << "Re-declaring transaction now... to receive next batch." << std::endl; + s.transaction_declare(); + } + } + + void on_session_transaction_aborted(proton::session &s) override { + std::cout << "Transaction aborted!" << std::endl; + std::cout << "Releasing all unsettled deliveries back to the broker..." << std::endl; + for (auto r: s.receivers()) { + for (auto d : r.unsettled_deliveries()) { + d.release(); + } + } + std::cout << "Re-declaring transaction now..." << std::endl; + current_batch = 0; + s.transaction_declare(); + } + + void on_message(proton::delivery &d, proton::message &msg) override { + std::cout<<"# MESSAGE: " << msg.id() <<": " << msg.body() << std::endl; + auto session = d.session(); + d.accept(); + current_batch += 1; + if (current_batch == batch_size) { + // Batch complete + if (batch_index % 2 == 1) { + std::cout << "Commiting transaction..." << std::endl; + session.transaction_commit(); + } else { + std::cout << "Aborting transaction..." << std::endl; + session.transaction_abort(); + } + batch_index++; + } + } +}; + +int main(int argc, char **argv) { + std::string conn_url = "//127.0.0.1:5672"; + std::string addr = "examples"; + int message_count = 6; + int batch_size = 3; + example::options opts(argc, argv); + + opts.add_value(conn_url, 'u', "url", "connection URL", "URL"); + opts.add_value(addr, 'a', "address", "address to receive messages from", "ADDR"); + opts.add_value(message_count, 'm', "messages", "number of messages to receive", "COUNT"); + opts.add_value(batch_size, 'b', "batch_size", "number of messages in each transaction", "BATCH_SIZE"); + + try { + opts.parse(); + + tx_recv recv(conn_url, addr, message_count, batch_size); + proton::container(recv).run(); + + return 0; + } catch (const example::bad_option& e) { + std::cout << opts << std::endl << e.what() << std::endl; + } catch (const std::exception& e) { + std::cerr << e.what() << std::endl; + } + + return 1; +} diff --git a/cpp/examples/tx_send.cpp b/cpp/examples/tx_send.cpp new file mode 100644 index 000000000..518b76479 --- /dev/null +++ b/cpp/examples/tx_send.cpp @@ -0,0 +1,207 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "options.hpp" + +#include <proton/connection.hpp> +#include <proton/container.hpp> +#include <proton/message.hpp> +#include <proton/message_id.hpp> +#include <proton/messaging_handler.hpp> +#include <proton/sender_options.hpp> +#include <proton/target_options.hpp> +#include <proton/types.hpp> + +#include <iostream> +#include <map> +#include <string> + +class tx_send : public proton::messaging_handler { + private: + std::string conn_url_; + std::string addr_; + proton::sender sender; + int total; + int batch_size; + int abort_message; + int accepted = 0; + int total_accepted = 0; + int batch_index = 0; + int current_batch = 0; + int committed = 0; + int msg_id = 10000; + int err_id; + + public: + tx_send(const std::string& u, const std::string& a, int t, int b, int c, int e): + conn_url_(u), addr_(a), total(t), batch_size(b), abort_message(c), err_id(e ? msg_id+e : 0) {} + + void on_container_start(proton::container &c) override { + c.connect(conn_url_); + } + + void on_connection_open(proton::connection& c) override { + c.open_session(); + } + + void on_session_open(proton::session& s) override { + std::cout << "New session is open, declaring transaction now..." << std::endl; + s.open_sender("", proton::sender_options{}.target(proton::target_options{}.anonymous(true))); + } + + void on_sender_open(proton::sender& s) override { + sender = s; + s.session().transaction_declare(); + } + + void on_session_transaction_declared(proton::session& s) override { + std::cout << "Transaction is declared: " << s.transaction_id() << std::endl; + send(sender); + } + + void on_session_error(proton::session &s) override { + std::cout << "Session error: " << s.error().what() << std::endl; + s.connection().close(); + } + + void on_session_transaction_error(proton::session &s) override { + std::cout << "Transaction error!" << s.transaction_error().what() << std::endl; + s.connection().close(); + } + + void on_sendable(proton::sender& sender) override { + proton::session session = sender.session(); + send(sender); + } + + void send(proton::sender& sender) { + while (sender.session().transaction_is_declared() && sender.credit() && + current_batch < batch_size) { + proton::message msg; + + msg.id(msg_id++); + if (msg_id != err_id) { + msg.to(addr_); + } + msg.body(std::map<std::string, int>{{"batch", batch_index}, {"index", current_batch}}); + std::cout << "Sending [" << batch_index << ", " << current_batch << "]: " << msg << std::endl; + sender.send(msg); + current_batch += 1; + } + } + + void on_transactional_accept(proton::tracker &t) override { + accepted++; + total_accepted++; + if (total_accepted == abort_message) { + t.session().transaction_abort(); + } else if (accepted == batch_size) { + t.session().transaction_commit(); + } + } + + void on_transactional_reject(proton::tracker &t) override { + std::cout << "Delivery rejected!" << std::endl; + t.session().transaction_abort(); + } + + void on_transactional_release(proton::tracker &t) override { + std::cout << "Delivery released!" << std::endl; + t.session().transaction_abort(); + } + + void on_session_transaction_committed(proton::session &s) override { + committed += accepted; + batch_index++; + std::cout << "Transaction committed" << std::endl; + if (committed >= total) { + std::cout << committed << " messages committed, closing connection." << std::endl; + s.connection().close(); + } else { + current_batch = 0; + accepted = 0; + std::cout << "Re-declaring transaction now..." << std::endl; + s.transaction_declare(); + } + } + + // The committed messages will be settled by the broker, we should settle them too. + void on_tracker_settle(proton::tracker &t) override { + std::cout << "Broker settled tracker: " << t.tag() << std::endl; + } + + void on_session_transaction_aborted(proton::session &s) override { + std::cout << "Transaction aborted!" << std::endl; + // Check if this was a failed commit + auto error = s.transaction_error(); + if (error) { + std::cout << "Transaction error: " << error.what() << std::endl; + s.connection().close(); + } + + // Don't close the connection if we deliberately injected an abort + if (abort_message == 0) { + s.connection().close(); + } else { + current_batch = 0; + accepted = 0; + std::cout << "Re-declaring transaction now..." << std::endl; + s.transaction_declare(); + } + } + + void on_sender_close(proton::sender &s) override { + current_batch = 0; + } + +}; + +int main(int argc, char **argv) { + std::string conn_url = "//127.0.0.1:5672"; + std::string addr = "examples"; + int message_count = 6; + int batch_size = 3; + int abort_message = 0; + int error_message = 0; + example::options opts(argc, argv); + + opts.add_value(conn_url, 'u', "url", "connect and send to URL", "URL"); + opts.add_value(addr, 'a', "address", "connect and send to address", "ADDR"); + opts.add_value(message_count, 'm', "messages", "number of messages to send", "COUNT"); + opts.add_value(batch_size, 'b', "batch_size", "number of messages in each transaction", "BATCH_SIZE"); + opts.add_value(abort_message, 'c', "abort_message", "message number to abort the transaction", "ABORT_MESSAGE"); + opts.add_value(error_message, 'e', "error_message", "message number to send in error", "ERROR_MESSAGE"); + + try { + opts.parse(); + + tx_send send(conn_url, addr, message_count, batch_size, abort_message, error_message); + proton::container(send).run(); + + return 0; + } catch (const example::bad_option& e) { + std::cout << opts << std::endl << e.what() << std::endl; + } catch (const std::exception& e) { + std::cerr << e.what() << std::endl; + } + + return 1; +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
