This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git

commit d90c70637fed77c18e62fb12b84613d6ffc865df
Author: Andrew Stitcher <[email protected]>
AuthorDate: Fri Mar 6 17:10:56 2026 -0500

    PROTON-1442: [C++] Interactive transaction tester
    
    You can interactively declare, commit, abort transactions;
    fetch/send messages;
    list pending unsettled messages;
    release unsettled messages
    
    This needs to be run against a broker that supports transactions
    
    Probably shouldn't be in examples as it's really a tester
    
    This code was written with the assistance of Cursor.
---
 cpp/examples/CMakeLists.txt     |   1 +
 cpp/examples/tx_interactive.cpp | 774 ++++++++++++++++++++++++++++++++++++++++
 2 files changed, 775 insertions(+)

diff --git a/cpp/examples/CMakeLists.txt b/cpp/examples/CMakeLists.txt
index 685a5ed36..25b414cd6 100644
--- a/cpp/examples/CMakeLists.txt
+++ b/cpp/examples/CMakeLists.txt
@@ -61,6 +61,7 @@ foreach(example
     service_bus
     multithreaded_client
     multithreaded_client_flow_control
+    tx_interactive
     tx_send
     tx_recv)
   add_executable(${example} ${example}.cpp)
diff --git a/cpp/examples/tx_interactive.cpp b/cpp/examples/tx_interactive.cpp
new file mode 100644
index 000000000..d48b21585
--- /dev/null
+++ b/cpp/examples/tx_interactive.cpp
@@ -0,0 +1,774 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/connection.hpp>
+#include <proton/container.hpp>
+#include <proton/delivery.hpp>
+#include <proton/duration.hpp>
+#include <proton/message.hpp>
+#include <proton/messaging_handler.hpp>
+#include <proton/receiver.hpp>
+#include <proton/receiver_options.hpp>
+#include <proton/sender.hpp>
+#include <proton/sender_options.hpp>
+#include <proton/session.hpp>
+#include <proton/target_options.hpp>
+#include <proton/tracker.hpp>
+#include <proton/transfer.hpp>
+#include <proton/types.hpp>
+#include <proton/work_queue.hpp>
+
+#include <algorithm>
+#include <condition_variable>
+#include <cstddef>
+#include <exception>
+#include <iostream>
+#include <mutex>
+#include <sstream>
+#include <string>
+#include <string_view>
+#include <thread>
+#include <vector>
+
+#if defined(_WIN32) || defined(_WIN64)
+#  include <windows.h>
+#else
+#  include <csignal>
+#endif
+
+// Sentinel queue name: omit message 'to' address so the peer may reject the 
message.
+constexpr std::string_view NO_TO_ADDRESS = "<none>";
+
+// Interactive tester for AMQP transactions: declare/commit/abort 
transactions, receive
+// messages (with optional timeout), send to the default or a given queue (or 
omit 'to'
+// for rejection testing), wait for disposition updates, and release 
unsettled. Type
+// 'help' for commands. Received messages are accepted on receipt.
+
+class tx_recv_interactive : public proton::messaging_handler {
+  private:
+    std::string conn_url_;
+    std::string addr_;
+
+    proton::connection connection_;
+    proton::receiver receiver_;
+    proton::sender sender_;
+    proton::session session_;
+    proton::work_queue* work_queue_ = nullptr;
+
+    int send_pending_ = 0;
+    int send_next_id_ = 0;
+    std::string send_to_addr_;
+
+    mutable std::mutex wait_mutex_;
+    std::condition_variable wait_cv_;
+    bool ready_ = false;
+    bool sleep_done_ = true;
+    bool timed_out_ = false;
+    int fetch_expected_ = 0;
+    int fetch_received_ = 0;
+    bool fetch_done_ = false;
+    int settled_expected_ = 0;
+    int settled_received_ = 0;
+    bool settled_done_ = false;
+    bool interrupt_requested_ = false;
+    std::string last_error_;
+
+    /// Run f() in a try block; on error, record the message for later 
reporting.
+    template <typename F>
+    void catch_any_error(F&& f) {
+        try {
+            f();
+        } catch (const std::exception& e) {
+            auto l = std::lock_guard(wait_mutex_);
+            last_error_ = e.what();
+        }
+    }
+
+    std::mutex sync_mutex_;
+    std::condition_variable sync_cv_;
+    bool sync_done_ = false;
+
+    void timeout_fired() {
+        auto l = std::lock_guard(wait_mutex_);
+        timed_out_ = true;
+        wait_cv_.notify_all();
+    }
+
+    void do_declare() {
+        catch_any_error([this]() { session_.transaction_declare(); });
+    }
+
+    void do_fetch(int n) {
+        receiver_.add_credit(n);
+    }
+
+    void do_commit() {
+        catch_any_error([this]() { session_.transaction_commit(); });
+    }
+
+    void do_abort() {
+        catch_any_error([this]() { session_.transaction_abort(); });
+    }
+
+    void do_release() {
+        for (auto rcv : session_.receivers()) {
+            for (auto d : rcv.unsettled_deliveries()) {
+                d.release();
+            }
+        }
+    }
+
+    void do_quit() {
+        connection_.close();
+    }
+
+  public:
+    tx_recv_interactive(const std::string& url, const std::string& addr)
+        : conn_url_(url), addr_(addr) {}
+
+    void on_container_start(proton::container& c) override {
+        c.connect(conn_url_);
+    }
+
+    void on_connection_open(proton::connection& conn) override {
+        connection_ = conn;
+        work_queue_ = &conn.work_queue();
+        // credit_window(0) so we control flow via "fetch"
+        receiver_ = conn.open_receiver(addr_, 
proton::receiver_options().credit_window(0));
+        sender_ = conn.open_sender("", 
proton::sender_options{}.target(proton::target_options{}.anonymous(true)));
+    }
+
+    void on_sender_open(proton::sender& s) override {
+        sender_ = s;
+    }
+
+    void on_session_open(proton::session& s) override {
+        session_ = s;
+        {
+            auto l = std::lock_guard(wait_mutex_);
+            ready_ = true;
+        }
+        wait_cv_.notify_all();
+    }
+
+    void on_session_transaction_declared(proton::session& s) override {
+        std::cout << "transaction declared: " << s.transaction_id() << 
std::endl;
+    }
+
+    void on_session_transaction_committed(proton::session& s) override {
+        std::cout << "transaction committed" << std::endl;
+    }
+
+    void on_session_transaction_aborted(proton::session& s) override {
+        std::cout << "transaction aborted: " << s.transaction_error().what() 
<< std::endl;
+    }
+
+    void on_session_transaction_error(proton::session& s) override {
+        std::cout << "transaction error: " << s.transaction_error().what() << 
std::endl;
+    }
+
+    void on_sendable(proton::sender&) override {
+        try_send();
+    }
+
+    void try_send() {
+        int to_send = 0;
+        std::string to_addr;
+        {
+            auto l = std::lock_guard(wait_mutex_);
+            to_send = send_pending_;
+            to_addr = send_to_addr_;
+        }
+        int sent = 0;
+        while (sender_ && sender_.credit() > 0 && to_send > 0) {
+            proton::message msg;
+            msg.id(send_next_id_);
+            if (to_addr != NO_TO_ADDRESS)
+                msg.to(to_addr);
+            msg.body(std::map<std::string, int>{{"message", send_next_id_}});
+            sender_.send(msg);
+            ++send_next_id_;
+            ++sent;
+            --to_send;
+        }
+        if (sent > 0) {
+            auto l = std::lock_guard(wait_mutex_);
+            send_pending_ -= sent;
+            if (send_pending_ == 0)
+                wait_cv_.notify_all();
+        }
+    }
+
+    void on_message(proton::delivery& d, proton::message& msg) override {
+        std::cout << d.tag() << ": " << msg.body() << std::endl;
+        d.accept();
+        {
+            auto l = std::lock_guard(wait_mutex_);
+            if (fetch_expected_ > 0) {
+                ++fetch_received_;
+                if (fetch_received_ >= fetch_expected_) {
+                    fetch_done_ = true;
+                    wait_cv_.notify_all();
+                }
+            }
+            // Pre-settled deliveries never trigger on_delivery_settle; count 
them here for wait_settled
+            if (settled_expected_ > 0 && d.settled()) {
+                ++settled_received_;
+                if (settled_received_ >= settled_expected_) {
+                    settled_done_ = true;
+                    wait_cv_.notify_all();
+                }
+            }
+        }
+    }
+
+    void on_delivery_settle(proton::delivery&) override {
+        auto l = std::lock_guard(wait_mutex_);
+        if (settled_expected_ > 0) {
+            ++settled_received_;
+            if (settled_received_ >= settled_expected_) {
+                settled_done_ = true;
+                wait_cv_.notify_all();
+            }
+        }
+    }
+
+    void on_transactional_accept(proton::tracker& t) override {
+        std::cout << "disposition: accepted: " << t.tag() << " (transactional: 
" << t.session().transaction_id() << ")" << std::endl;
+    }
+
+    void on_transactional_reject(proton::tracker& t) override {
+        std::cout << "disposition: rejected: " << t.tag() << " (transactional: 
" << t.session().transaction_id() << ")" << std::endl;
+    }
+
+    void on_transactional_release(proton::tracker& t) override {
+        std::cout << "disposition: released: " << t.tag() << " (transactional: 
" << t.session().transaction_id() << ")" << std::endl;
+    }
+
+    void on_tracker_accept(proton::tracker& t) override {
+        std::cout << "disposition: accepted: " << t.tag() << std::endl;
+    }
+
+    void on_tracker_reject(proton::tracker& t) override {
+        std::cout << "disposition: rejected: " << t.tag() << std::endl;
+    }
+
+    void on_tracker_release(proton::tracker& t) override {
+        std::cout << "disposition: released: " << t.tag() << std::endl;
+    }
+
+    void on_tracker_settle(proton::tracker& t) override {
+        std::cout << "disposition: settled: " << t.tag() << std::endl;
+    }
+
+    void on_session_error(proton::session& s) override {
+        std::cout << "Session error: " << s.error().what() << std::endl;
+        s.connection().close();
+    }
+
+    /// Called from the SIGINT-handling thread to wake any current wait.
+    void request_interrupt() {
+        {
+            auto l = std::lock_guard(wait_mutex_);
+            interrupt_requested_ = true;
+            wait_cv_.notify_all();
+        }
+        {
+            auto l = std::lock_guard(sync_mutex_);
+            sync_cv_.notify_all();
+        }
+    }
+
+    /// True if the last wait was exited due to Ctrl-C.
+    bool interrupted() const {
+        auto l = std::lock_guard(wait_mutex_);
+        return interrupt_requested_;
+    }
+
+    /// Clear the interrupt flag (e.g. after the command loop has handled it).
+    void clear_interrupt() {
+        auto l = std::lock_guard(wait_mutex_);
+        interrupt_requested_ = false;
+    }
+
+    // Thread-safe: wait until handler is ready to accept commands
+    void wait_ready() {
+        auto l = std::unique_lock(wait_mutex_);
+        interrupt_requested_ = false;
+        wait_cv_.wait(l, [this] { return ready_ || interrupt_requested_; });
+    }
+
+    /// Wait until the connection thread has processed all work queued so far.
+    /// There is no "loop is idle" callback in the Proton API; this adds a
+    /// sentinel work item and returns when it runs (so the connection thread
+    /// has caught up). Use before showing the prompt so the command loop stays
+    /// in sync with background connection-thread activity.
+    void sync_with_connection_thread() {
+        {
+            auto l = std::lock_guard(wait_mutex_);
+            interrupt_requested_ = false;
+        }
+        {
+            auto l = std::lock_guard(sync_mutex_);
+            sync_done_ = false;
+        }
+        work_queue_->add([this]() {
+            auto l = std::lock_guard(sync_mutex_);
+            sync_done_ = true;
+            sync_cv_.notify_all();
+        });
+        auto l = std::unique_lock(sync_mutex_);
+        sync_cv_.wait(l, [this] {
+            if (sync_done_) return true;
+            auto w = std::lock_guard(wait_mutex_);
+            return interrupt_requested_;
+        });
+    }
+
+    /// Schedule a callback on the connection thread after a delay (seconds).
+    /// wait_sleep_done() blocks until that callback has run.
+    void sleep(double seconds) {
+        auto l = std::unique_lock(wait_mutex_);
+        interrupt_requested_ = false;
+        sleep_done_ = false;
+        l.unlock();
+        auto ms = static_cast<proton::duration::numeric_type>(seconds * 1000);
+        work_queue_->schedule(proton::duration(ms), [this]() {
+            auto l = std::lock_guard(wait_mutex_);
+            sleep_done_ = true;
+            wait_cv_.notify_all();
+        });
+        l.lock();
+        wait_cv_.wait(l, [this] { return sleep_done_ || interrupt_requested_; 
});
+    }
+
+    /// Start fetching n messages; optionally timeout after timeout_seconds (0 
= no timeout).
+    /// wait_fetch_done() blocks until n messages received or timeout 
(whichever first).
+    void fetch(int n, double timeout_seconds) {
+        auto l = std::unique_lock(wait_mutex_);
+        interrupt_requested_ = false;
+        timed_out_ = false;
+        fetch_expected_ = n;
+        fetch_received_ = 0;
+        fetch_done_ = (n <= 0);
+        l.unlock();
+        work_queue_->add([this, n]() { do_fetch(n); });
+        if (timeout_seconds > 0) {
+            auto ms = 
static_cast<proton::duration::numeric_type>(timeout_seconds * 1000);
+            work_queue_->schedule(proton::duration(ms), [this]() { 
timeout_fired(); });
+        }
+        l.lock();
+        wait_cv_.wait(l, [this] { return fetch_done_ || timed_out_ || 
interrupt_requested_; });
+        fetch_expected_ = 0;
+    }
+    int fetch_received_count() const {
+        auto l = std::lock_guard(wait_mutex_);
+        return fetch_received_;
+    }
+    bool timed_out() const {
+        auto l = std::lock_guard(wait_mutex_);
+        return timed_out_;
+    }
+
+    /// Clear the timed-out flag (e.g. after the command loop has handled it).
+    void clear_timed_out() {
+        auto l = std::lock_guard(wait_mutex_);
+        timed_out_ = false;
+    }
+
+    /// If a proton::error was recorded, assign its message to out, clear it, 
and return true.
+    bool take_last_error(std::string& out) {
+        auto l = std::lock_guard(wait_mutex_);
+        if (last_error_.empty())
+            return false;
+        out = std::move(last_error_);
+        last_error_.clear();
+        return true;
+    }
+
+    /// Wait until N disposition settlements (on_delivery_settle) have been 
received,
+    /// or timeout_seconds (0 = no timeout). Same pattern as fetch.
+    void wait_settled(int n, double timeout_seconds) {
+        auto l = std::unique_lock(wait_mutex_);
+        interrupt_requested_ = false;
+        timed_out_ = false;
+        settled_expected_ = n;
+        settled_received_ = 0;
+        settled_done_ = (n <= 0);
+        l.unlock();
+        if (n <= 0)
+            return;
+        if (timeout_seconds > 0) {
+            auto ms = 
static_cast<proton::duration::numeric_type>(timeout_seconds * 1000);
+            work_queue_->schedule(proton::duration(ms), [this]() { 
timeout_fired(); });
+        }
+        l.lock();
+        wait_cv_.wait(l, [this] { return settled_done_ || timed_out_ || 
interrupt_requested_; });
+        settled_expected_ = 0;
+    }
+    int settled_received_count() const {
+        auto l = std::lock_guard(wait_mutex_);
+        return settled_received_;
+    }
+
+    // Thread-safe: schedule work on the container thread
+    void declare() {
+        work_queue_->add([this]() { do_declare(); });
+    }
+    void commit() {
+        work_queue_->add([this]() { do_commit(); });
+    }
+    void abort() {
+        work_queue_->add([this]() { do_abort(); });
+    }
+    void release() {
+        work_queue_->add([this]() { do_release(); });
+    }
+    void send(int n, const std::string& to_addr) {
+        auto l = std::unique_lock(wait_mutex_);
+        interrupt_requested_ = false;
+        send_pending_ = n;
+        send_to_addr_ = to_addr.empty() ? addr_ : to_addr;
+        l.unlock();
+        work_queue_->add([this]() { try_send(); });
+        l.lock();
+        wait_cv_.wait(l, [this] { return send_pending_ == 0 || 
interrupt_requested_; });
+    }
+    void list_unsettled() {
+        auto count = std::size_t(0);
+        for (auto rcv : session_.receivers()) {
+            for (auto d : rcv.unsettled_deliveries()) {
+                (void)d;
+                ++count;
+            }
+        }
+        std::cout << count << " unsettled delivery(ies)" << std::endl;
+        for (auto rcv : session_.receivers()) {
+            for (auto d : rcv.unsettled_deliveries()) {
+                std::cout << "  " << d.tag() << std::endl;
+            }
+        }
+    }
+    void quit() {
+        work_queue_->add([this]() { do_quit(); });
+    }
+};
+
+using command_fn = bool (*)(tx_recv_interactive& recv, const 
std::vector<std::string>& args);
+
+static bool cmd_declare(tx_recv_interactive& recv, const 
std::vector<std::string>&) {
+    recv.declare();
+    return false;
+}
+static bool cmd_fetch(tx_recv_interactive& recv, const 
std::vector<std::string>& args) {
+    auto n = 1;
+    auto timeout_seconds = 0.0;
+    if (!args.empty()) {
+        try {
+            n = std::stoi(args[0]);
+            if (n < 1) n = 1;
+        } catch (...) {
+            std::cout << "fetch: expected positive number, got '" << args[0] 
<< "'" << std::endl;
+            return false;
+        }
+    }
+    if (args.size() >= 2) {
+        try {
+            timeout_seconds = std::stof(args[1]);
+            if (timeout_seconds < 0) timeout_seconds = 0;
+        } catch (...) {
+            std::cout << "fetch: expected timeout in seconds, got '" << 
args[1] << "'" << std::endl;
+            return false;
+        }
+    }
+    recv.fetch(n, timeout_seconds);
+    std::cout << "fetch: received " << recv.fetch_received_count() << " 
message(s)" << std::endl;
+    return false;
+}
+static bool cmd_commit(tx_recv_interactive& recv, const 
std::vector<std::string>&) {
+    recv.commit();
+    return false;
+}
+static bool cmd_abort(tx_recv_interactive& recv, const 
std::vector<std::string>&) {
+    recv.abort();
+    return false;
+}
+static bool cmd_unsettled(tx_recv_interactive& recv, const 
std::vector<std::string>&) {
+    recv.list_unsettled();
+    return false;
+}
+static bool cmd_release(tx_recv_interactive& recv, const 
std::vector<std::string>&) {
+    recv.release();
+    return false;
+}
+static bool cmd_send(tx_recv_interactive& recv, const 
std::vector<std::string>& args) {
+    auto n = 1;
+    auto to_addr = std::string();
+    if (!args.empty()) {
+        try {
+            n = std::stoi(args[0]);
+            if (n < 1) n = 1;
+        } catch (...) {
+            std::cout << "send: expected positive number, got '" << args[0] << 
"'" << std::endl;
+            return false;
+        }
+    }
+    if (args.size() >= 2)
+        to_addr = args[1];
+    recv.send(n, to_addr);
+    std::cout << "send: sent " << n << " message(s) to " << (to_addr.empty() ? 
"(default address)" : to_addr) << std::endl;
+    return false;
+}
+static bool cmd_wait_settled(tx_recv_interactive& recv, const 
std::vector<std::string>& args) {
+    auto n = 1;
+    auto timeout_seconds = 0.0;
+    if (!args.empty()) {
+        try {
+            n = std::stoi(args[0]);
+            if (n < 0) n = 0;
+        } catch (...) {
+            std::cout << "wait_settled: expected non-negative count, got '" << 
args[0] << "'" << std::endl;
+            return false;
+        }
+    }
+    if (args.size() >= 2) {
+        try {
+            timeout_seconds = std::stof(args[1]);
+            if (timeout_seconds < 0) timeout_seconds = 0;
+        } catch (...) {
+            std::cout << "wait_settled: expected timeout in seconds, got '" << 
args[1] << "'" << std::endl;
+            return false;
+        }
+    }
+    recv.wait_settled(n, timeout_seconds);
+    std::cout << "wait_settled: " << recv.settled_received_count() << " 
settlement(s)" << std::endl;
+    return false;
+}
+static bool cmd_sleep(tx_recv_interactive& recv, const 
std::vector<std::string>& args) {
+    if (args.empty()) {
+        std::cout << "sleep: expected duration in seconds (e.g. sleep 1.5)" << 
std::endl;
+        return false;
+    }
+    float seconds;
+    try {
+        seconds = std::stof(args[0]);
+        if (seconds < 0) seconds = 0;
+    } catch (...) {
+        std::cout << "sleep: expected number of seconds, got '" << args[0] << 
"'" << std::endl;
+        return false;
+    }
+    recv.sleep(static_cast<double>(seconds));
+    return false;
+}
+static bool cmd_quit(tx_recv_interactive&, const std::vector<std::string>&) {
+    return true;
+}
+
+static bool cmd_help(tx_recv_interactive&, const std::vector<std::string>&);
+
+struct command_entry {
+    const char* name;
+    const char* description;
+    command_fn fn;
+};
+
+// Lexicographically sorted by name for std::lower_bound lookup
+static constexpr command_entry COMMAND_TABLE[] = {
+    {"abort", "Abort the current transaction", cmd_abort},
+    {"commit", "Commit the current transaction", cmd_commit},
+    {"declare", "Start a transaction", cmd_declare},
+    {"fetch", "Receive n messages (optional timeout in seconds)", cmd_fetch},
+    {"help", "Show this list of commands", cmd_help},
+    {"quit", "Exit the program", cmd_quit},
+    {"release", "Release all unsettled deliveries", cmd_release},
+    {"send", "Send n messages to queue (optional to address; use <none> to 
omit)", cmd_send},
+    {"sleep", "Sleep for given seconds", cmd_sleep},
+    {"unsettled", "List unsettled deliveries", cmd_unsettled},
+    {"wait_settled", "Wait for n disposition updates (optional timeout)", 
cmd_wait_settled},
+};
+static constexpr std::size_t COMMAND_TABLE_SIZE = sizeof(COMMAND_TABLE) / 
sizeof(COMMAND_TABLE[0]);
+
+static bool cmd_help(tx_recv_interactive&, const std::vector<std::string>&) {
+    for (std::size_t i = 0; i < COMMAND_TABLE_SIZE; ++i) {
+        std::cout << "  " << COMMAND_TABLE[i].name << " - " << 
COMMAND_TABLE[i].description << "\n";
+    }
+    return false;
+}
+
+/// Split a string into words (by whitespace).
+static std::vector<std::string> split_args(const std::string& line) {
+    std::vector<std::string> out;
+    std::istringstream is(line);
+    for (std::string word; is >> word;) out.push_back(word);
+    return out;
+}
+
+/// Parsed command: first element is command name, remaining elements are 
arguments.
+using parsed_command = std::vector<std::string>;
+
+/// Parse a line into zero or more commands separated by ';'. Each segment is
+/// split into words (command name + args). Empty segments are skipped.
+/// Returns a sequence that can be iterated to run each command.
+static std::vector<parsed_command> parse_command_line(const std::string& line) 
{
+    std::vector<parsed_command> commands;
+    std::istringstream is(line);
+    for (std::string segment; std::getline(is, segment, ';');) {
+        auto args = split_args(segment);
+        if (!args.empty())
+            commands.push_back(std::move(args));
+    }
+    return commands;
+}
+
+static const command_entry* find_command(std::string_view name) {
+    auto it = std::lower_bound(std::begin(COMMAND_TABLE), 
std::end(COMMAND_TABLE), name,
+        [](const command_entry& e, std::string_view s) {
+            return std::string_view(e.name) < s;
+        });
+    if (it != std::end(COMMAND_TABLE) && std::string_view(it->name) == name)
+        return &*it;
+    return nullptr;
+}
+
+static bool execute_command(tx_recv_interactive& recv, const command_entry& 
cmd, const std::vector<std::string>& args) {
+    auto cmd_args = std::vector<std::string>(args.begin() + 1, args.end());
+    return cmd.fn(recv, cmd_args);
+}
+
+#if defined(_WIN32) || defined(_WIN64)
+static void block_interrupt() {
+}
+
+static HANDLE g_ctrl_c_event = nullptr;
+
+static BOOL WINAPI win_ctrl_handler(DWORD dwCtrlType) {
+    if (dwCtrlType == CTRL_C_EVENT && g_ctrl_c_event != nullptr)
+        SetEvent(g_ctrl_c_event);
+    return TRUE;
+}
+
+static void interrupt_thread(tx_recv_interactive* recv) {
+    HANDLE ev = CreateEventW(nullptr, TRUE, FALSE, nullptr);
+    if (ev == nullptr)
+        return;
+    g_ctrl_c_event = ev;
+    if (!SetConsoleCtrlHandler(win_ctrl_handler, TRUE)) {
+        CloseHandle(ev);
+        return;
+    }
+    for (;;) {
+        if (WaitForSingleObject(ev, INFINITE) == WAIT_OBJECT_0 && recv != 
nullptr) {
+            recv->request_interrupt();
+            ResetEvent(ev);
+        }
+    }
+}
+#else
+static void block_interrupt() {
+    sigset_t set;
+    sigemptyset(&set);
+    sigaddset(&set, SIGINT);
+    pthread_sigmask(SIG_BLOCK, &set, nullptr);
+}
+
+static void interrupt_thread(tx_recv_interactive* recv) {
+    sigset_t set;
+    sigemptyset(&set);
+    sigaddset(&set, SIGINT);
+    for (;;) {
+        int sig = 0;
+        if (sigwait(&set, &sig) == 0 && sig == SIGINT && recv != nullptr)
+            recv->request_interrupt();
+    }
+}
+#endif
+
+int main(int argc, char** argv) {
+    block_interrupt();
+
+    auto conn_url = std::string("//127.0.0.1:5672");
+    auto addr = std::string("examples");
+    auto initial_commands = std::string();
+    auto opts = example::options(argc, argv);
+
+    opts.add_value(conn_url, 'u', "url", "connection URL", "URL");
+    opts.add_value(addr, 'a', "address", "address to receive messages from", 
"ADDR");
+    opts.add_value(initial_commands, 'c', "commands", "commands to run before 
interactive mode (e.g. declare; fetch 2; quit)", "COMMANDS");
+
+    try {
+        opts.parse();
+
+        auto recv = tx_recv_interactive(conn_url, addr);
+        auto container = proton::container(recv);
+        auto container_thread = std::thread([&container]() { container.run(); 
});
+
+        std::thread interrupt_th(interrupt_thread, &recv);
+        interrupt_th.detach();
+
+        recv.wait_ready();
+
+        auto line = initial_commands;
+        bool quit_requested = false;
+        while (!quit_requested) {
+            if (line.empty()) {
+                std::cout << "> " << std::flush;
+                if (!std::getline(std::cin, line))
+                    break;
+            }
+            for (const auto& args : parse_command_line(line)) {
+                auto* cmd = find_command(args[0]);
+                if (cmd) {
+                    if (execute_command(recv, *cmd, args)) {
+                        quit_requested = true;
+                        break;
+                    }
+                    if (recv.interrupted()) {
+                        std::cout << "[interrupted]" << std::endl;
+                        recv.clear_interrupt();
+                    }
+                    if (recv.timed_out()) {
+                        std::cout << "[timed out]" << std::endl;
+                        recv.clear_timed_out();
+                    }
+                    recv.sync_with_connection_thread();
+                    if (std::string err; recv.take_last_error(err)) {
+                        std::cout << "[error: " << err << "]" <<std::endl;
+                    }
+                } else {
+                    std::cout << "Unknown command. Type 'help' for a list of 
commands." << std::endl;
+                }
+            }
+            line.clear();
+        }
+
+        recv.quit();
+        container_thread.join();
+        return 0;
+    } catch (const example::bad_option& e) {
+        std::cout << opts << std::endl << e.what() << std::endl;
+    } catch (const std::exception& e) {
+        std::cerr << e.what() << std::endl;
+    }
+
+    return 1;
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to