This is an automated email from the ASF dual-hosted git repository. asf-gitbox-commits pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit a932ea4322f18b72ee924317dd336d36c924620d Author: Andrew Stitcher <[email protected]> AuthorDate: Fri May 29 01:46:29 2026 -0400 PROTON-2922: Transaction testing Improve interactive tester so that it is capable of automatically testing transactions at build time. It still requires a transaction supporting broker to test against. This code was written with the assistance of Cursor. --- cpp/examples/CMakeLists.txt | 1 - cpp/examples/tx_interactive.cpp | 774 --------------- tests/cpp/CMakeLists.txt | 34 + tests/cpp/tx_tester.cpp | 1997 +++++++++++++++++++++++++++++++++++++++ 4 files changed, 2031 insertions(+), 775 deletions(-) diff --git a/cpp/examples/CMakeLists.txt b/cpp/examples/CMakeLists.txt index 25b414cd6..685a5ed36 100644 --- a/cpp/examples/CMakeLists.txt +++ b/cpp/examples/CMakeLists.txt @@ -61,7 +61,6 @@ foreach(example service_bus multithreaded_client multithreaded_client_flow_control - tx_interactive tx_send tx_recv) add_executable(${example} ${example}.cpp) diff --git a/cpp/examples/tx_interactive.cpp b/cpp/examples/tx_interactive.cpp deleted file mode 100644 index d48b21585..000000000 --- a/cpp/examples/tx_interactive.cpp +++ /dev/null @@ -1,774 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "options.hpp" - -#include <proton/connection.hpp> -#include <proton/container.hpp> -#include <proton/delivery.hpp> -#include <proton/duration.hpp> -#include <proton/message.hpp> -#include <proton/messaging_handler.hpp> -#include <proton/receiver.hpp> -#include <proton/receiver_options.hpp> -#include <proton/sender.hpp> -#include <proton/sender_options.hpp> -#include <proton/session.hpp> -#include <proton/target_options.hpp> -#include <proton/tracker.hpp> -#include <proton/transfer.hpp> -#include <proton/types.hpp> -#include <proton/work_queue.hpp> - -#include <algorithm> -#include <condition_variable> -#include <cstddef> -#include <exception> -#include <iostream> -#include <mutex> -#include <sstream> -#include <string> -#include <string_view> -#include <thread> -#include <vector> - -#if defined(_WIN32) || defined(_WIN64) -# include <windows.h> -#else -# include <csignal> -#endif - -// Sentinel queue name: omit message 'to' address so the peer may reject the message. -constexpr std::string_view NO_TO_ADDRESS = "<none>"; - -// Interactive tester for AMQP transactions: declare/commit/abort transactions, receive -// messages (with optional timeout), send to the default or a given queue (or omit 'to' -// for rejection testing), wait for disposition updates, and release unsettled. Type -// 'help' for commands. Received messages are accepted on receipt. - -class tx_recv_interactive : public proton::messaging_handler { - private: - std::string conn_url_; - std::string addr_; - - proton::connection connection_; - proton::receiver receiver_; - proton::sender sender_; - proton::session session_; - proton::work_queue* work_queue_ = nullptr; - - int send_pending_ = 0; - int send_next_id_ = 0; - std::string send_to_addr_; - - mutable std::mutex wait_mutex_; - std::condition_variable wait_cv_; - bool ready_ = false; - bool sleep_done_ = true; - bool timed_out_ = false; - int fetch_expected_ = 0; - int fetch_received_ = 0; - bool fetch_done_ = false; - int settled_expected_ = 0; - int settled_received_ = 0; - bool settled_done_ = false; - bool interrupt_requested_ = false; - std::string last_error_; - - /// Run f() in a try block; on error, record the message for later reporting. - template <typename F> - void catch_any_error(F&& f) { - try { - f(); - } catch (const std::exception& e) { - auto l = std::lock_guard(wait_mutex_); - last_error_ = e.what(); - } - } - - std::mutex sync_mutex_; - std::condition_variable sync_cv_; - bool sync_done_ = false; - - void timeout_fired() { - auto l = std::lock_guard(wait_mutex_); - timed_out_ = true; - wait_cv_.notify_all(); - } - - void do_declare() { - catch_any_error([this]() { session_.transaction_declare(); }); - } - - void do_fetch(int n) { - receiver_.add_credit(n); - } - - void do_commit() { - catch_any_error([this]() { session_.transaction_commit(); }); - } - - void do_abort() { - catch_any_error([this]() { session_.transaction_abort(); }); - } - - void do_release() { - for (auto rcv : session_.receivers()) { - for (auto d : rcv.unsettled_deliveries()) { - d.release(); - } - } - } - - void do_quit() { - connection_.close(); - } - - public: - tx_recv_interactive(const std::string& url, const std::string& addr) - : conn_url_(url), addr_(addr) {} - - void on_container_start(proton::container& c) override { - c.connect(conn_url_); - } - - void on_connection_open(proton::connection& conn) override { - connection_ = conn; - work_queue_ = &conn.work_queue(); - // credit_window(0) so we control flow via "fetch" - receiver_ = conn.open_receiver(addr_, proton::receiver_options().credit_window(0)); - sender_ = conn.open_sender("", proton::sender_options{}.target(proton::target_options{}.anonymous(true))); - } - - void on_sender_open(proton::sender& s) override { - sender_ = s; - } - - void on_session_open(proton::session& s) override { - session_ = s; - { - auto l = std::lock_guard(wait_mutex_); - ready_ = true; - } - wait_cv_.notify_all(); - } - - void on_session_transaction_declared(proton::session& s) override { - std::cout << "transaction declared: " << s.transaction_id() << std::endl; - } - - void on_session_transaction_committed(proton::session& s) override { - std::cout << "transaction committed" << std::endl; - } - - void on_session_transaction_aborted(proton::session& s) override { - std::cout << "transaction aborted: " << s.transaction_error().what() << std::endl; - } - - void on_session_transaction_error(proton::session& s) override { - std::cout << "transaction error: " << s.transaction_error().what() << std::endl; - } - - void on_sendable(proton::sender&) override { - try_send(); - } - - void try_send() { - int to_send = 0; - std::string to_addr; - { - auto l = std::lock_guard(wait_mutex_); - to_send = send_pending_; - to_addr = send_to_addr_; - } - int sent = 0; - while (sender_ && sender_.credit() > 0 && to_send > 0) { - proton::message msg; - msg.id(send_next_id_); - if (to_addr != NO_TO_ADDRESS) - msg.to(to_addr); - msg.body(std::map<std::string, int>{{"message", send_next_id_}}); - sender_.send(msg); - ++send_next_id_; - ++sent; - --to_send; - } - if (sent > 0) { - auto l = std::lock_guard(wait_mutex_); - send_pending_ -= sent; - if (send_pending_ == 0) - wait_cv_.notify_all(); - } - } - - void on_message(proton::delivery& d, proton::message& msg) override { - std::cout << d.tag() << ": " << msg.body() << std::endl; - d.accept(); - { - auto l = std::lock_guard(wait_mutex_); - if (fetch_expected_ > 0) { - ++fetch_received_; - if (fetch_received_ >= fetch_expected_) { - fetch_done_ = true; - wait_cv_.notify_all(); - } - } - // Pre-settled deliveries never trigger on_delivery_settle; count them here for wait_settled - if (settled_expected_ > 0 && d.settled()) { - ++settled_received_; - if (settled_received_ >= settled_expected_) { - settled_done_ = true; - wait_cv_.notify_all(); - } - } - } - } - - void on_delivery_settle(proton::delivery&) override { - auto l = std::lock_guard(wait_mutex_); - if (settled_expected_ > 0) { - ++settled_received_; - if (settled_received_ >= settled_expected_) { - settled_done_ = true; - wait_cv_.notify_all(); - } - } - } - - void on_transactional_accept(proton::tracker& t) override { - std::cout << "disposition: accepted: " << t.tag() << " (transactional: " << t.session().transaction_id() << ")" << std::endl; - } - - void on_transactional_reject(proton::tracker& t) override { - std::cout << "disposition: rejected: " << t.tag() << " (transactional: " << t.session().transaction_id() << ")" << std::endl; - } - - void on_transactional_release(proton::tracker& t) override { - std::cout << "disposition: released: " << t.tag() << " (transactional: " << t.session().transaction_id() << ")" << std::endl; - } - - void on_tracker_accept(proton::tracker& t) override { - std::cout << "disposition: accepted: " << t.tag() << std::endl; - } - - void on_tracker_reject(proton::tracker& t) override { - std::cout << "disposition: rejected: " << t.tag() << std::endl; - } - - void on_tracker_release(proton::tracker& t) override { - std::cout << "disposition: released: " << t.tag() << std::endl; - } - - void on_tracker_settle(proton::tracker& t) override { - std::cout << "disposition: settled: " << t.tag() << std::endl; - } - - void on_session_error(proton::session& s) override { - std::cout << "Session error: " << s.error().what() << std::endl; - s.connection().close(); - } - - /// Called from the SIGINT-handling thread to wake any current wait. - void request_interrupt() { - { - auto l = std::lock_guard(wait_mutex_); - interrupt_requested_ = true; - wait_cv_.notify_all(); - } - { - auto l = std::lock_guard(sync_mutex_); - sync_cv_.notify_all(); - } - } - - /// True if the last wait was exited due to Ctrl-C. - bool interrupted() const { - auto l = std::lock_guard(wait_mutex_); - return interrupt_requested_; - } - - /// Clear the interrupt flag (e.g. after the command loop has handled it). - void clear_interrupt() { - auto l = std::lock_guard(wait_mutex_); - interrupt_requested_ = false; - } - - // Thread-safe: wait until handler is ready to accept commands - void wait_ready() { - auto l = std::unique_lock(wait_mutex_); - interrupt_requested_ = false; - wait_cv_.wait(l, [this] { return ready_ || interrupt_requested_; }); - } - - /// Wait until the connection thread has processed all work queued so far. - /// There is no "loop is idle" callback in the Proton API; this adds a - /// sentinel work item and returns when it runs (so the connection thread - /// has caught up). Use before showing the prompt so the command loop stays - /// in sync with background connection-thread activity. - void sync_with_connection_thread() { - { - auto l = std::lock_guard(wait_mutex_); - interrupt_requested_ = false; - } - { - auto l = std::lock_guard(sync_mutex_); - sync_done_ = false; - } - work_queue_->add([this]() { - auto l = std::lock_guard(sync_mutex_); - sync_done_ = true; - sync_cv_.notify_all(); - }); - auto l = std::unique_lock(sync_mutex_); - sync_cv_.wait(l, [this] { - if (sync_done_) return true; - auto w = std::lock_guard(wait_mutex_); - return interrupt_requested_; - }); - } - - /// Schedule a callback on the connection thread after a delay (seconds). - /// wait_sleep_done() blocks until that callback has run. - void sleep(double seconds) { - auto l = std::unique_lock(wait_mutex_); - interrupt_requested_ = false; - sleep_done_ = false; - l.unlock(); - auto ms = static_cast<proton::duration::numeric_type>(seconds * 1000); - work_queue_->schedule(proton::duration(ms), [this]() { - auto l = std::lock_guard(wait_mutex_); - sleep_done_ = true; - wait_cv_.notify_all(); - }); - l.lock(); - wait_cv_.wait(l, [this] { return sleep_done_ || interrupt_requested_; }); - } - - /// Start fetching n messages; optionally timeout after timeout_seconds (0 = no timeout). - /// wait_fetch_done() blocks until n messages received or timeout (whichever first). - void fetch(int n, double timeout_seconds) { - auto l = std::unique_lock(wait_mutex_); - interrupt_requested_ = false; - timed_out_ = false; - fetch_expected_ = n; - fetch_received_ = 0; - fetch_done_ = (n <= 0); - l.unlock(); - work_queue_->add([this, n]() { do_fetch(n); }); - if (timeout_seconds > 0) { - auto ms = static_cast<proton::duration::numeric_type>(timeout_seconds * 1000); - work_queue_->schedule(proton::duration(ms), [this]() { timeout_fired(); }); - } - l.lock(); - wait_cv_.wait(l, [this] { return fetch_done_ || timed_out_ || interrupt_requested_; }); - fetch_expected_ = 0; - } - int fetch_received_count() const { - auto l = std::lock_guard(wait_mutex_); - return fetch_received_; - } - bool timed_out() const { - auto l = std::lock_guard(wait_mutex_); - return timed_out_; - } - - /// Clear the timed-out flag (e.g. after the command loop has handled it). - void clear_timed_out() { - auto l = std::lock_guard(wait_mutex_); - timed_out_ = false; - } - - /// If a proton::error was recorded, assign its message to out, clear it, and return true. - bool take_last_error(std::string& out) { - auto l = std::lock_guard(wait_mutex_); - if (last_error_.empty()) - return false; - out = std::move(last_error_); - last_error_.clear(); - return true; - } - - /// Wait until N disposition settlements (on_delivery_settle) have been received, - /// or timeout_seconds (0 = no timeout). Same pattern as fetch. - void wait_settled(int n, double timeout_seconds) { - auto l = std::unique_lock(wait_mutex_); - interrupt_requested_ = false; - timed_out_ = false; - settled_expected_ = n; - settled_received_ = 0; - settled_done_ = (n <= 0); - l.unlock(); - if (n <= 0) - return; - if (timeout_seconds > 0) { - auto ms = static_cast<proton::duration::numeric_type>(timeout_seconds * 1000); - work_queue_->schedule(proton::duration(ms), [this]() { timeout_fired(); }); - } - l.lock(); - wait_cv_.wait(l, [this] { return settled_done_ || timed_out_ || interrupt_requested_; }); - settled_expected_ = 0; - } - int settled_received_count() const { - auto l = std::lock_guard(wait_mutex_); - return settled_received_; - } - - // Thread-safe: schedule work on the container thread - void declare() { - work_queue_->add([this]() { do_declare(); }); - } - void commit() { - work_queue_->add([this]() { do_commit(); }); - } - void abort() { - work_queue_->add([this]() { do_abort(); }); - } - void release() { - work_queue_->add([this]() { do_release(); }); - } - void send(int n, const std::string& to_addr) { - auto l = std::unique_lock(wait_mutex_); - interrupt_requested_ = false; - send_pending_ = n; - send_to_addr_ = to_addr.empty() ? addr_ : to_addr; - l.unlock(); - work_queue_->add([this]() { try_send(); }); - l.lock(); - wait_cv_.wait(l, [this] { return send_pending_ == 0 || interrupt_requested_; }); - } - void list_unsettled() { - auto count = std::size_t(0); - for (auto rcv : session_.receivers()) { - for (auto d : rcv.unsettled_deliveries()) { - (void)d; - ++count; - } - } - std::cout << count << " unsettled delivery(ies)" << std::endl; - for (auto rcv : session_.receivers()) { - for (auto d : rcv.unsettled_deliveries()) { - std::cout << " " << d.tag() << std::endl; - } - } - } - void quit() { - work_queue_->add([this]() { do_quit(); }); - } -}; - -using command_fn = bool (*)(tx_recv_interactive& recv, const std::vector<std::string>& args); - -static bool cmd_declare(tx_recv_interactive& recv, const std::vector<std::string>&) { - recv.declare(); - return false; -} -static bool cmd_fetch(tx_recv_interactive& recv, const std::vector<std::string>& args) { - auto n = 1; - auto timeout_seconds = 0.0; - if (!args.empty()) { - try { - n = std::stoi(args[0]); - if (n < 1) n = 1; - } catch (...) { - std::cout << "fetch: expected positive number, got '" << args[0] << "'" << std::endl; - return false; - } - } - if (args.size() >= 2) { - try { - timeout_seconds = std::stof(args[1]); - if (timeout_seconds < 0) timeout_seconds = 0; - } catch (...) { - std::cout << "fetch: expected timeout in seconds, got '" << args[1] << "'" << std::endl; - return false; - } - } - recv.fetch(n, timeout_seconds); - std::cout << "fetch: received " << recv.fetch_received_count() << " message(s)" << std::endl; - return false; -} -static bool cmd_commit(tx_recv_interactive& recv, const std::vector<std::string>&) { - recv.commit(); - return false; -} -static bool cmd_abort(tx_recv_interactive& recv, const std::vector<std::string>&) { - recv.abort(); - return false; -} -static bool cmd_unsettled(tx_recv_interactive& recv, const std::vector<std::string>&) { - recv.list_unsettled(); - return false; -} -static bool cmd_release(tx_recv_interactive& recv, const std::vector<std::string>&) { - recv.release(); - return false; -} -static bool cmd_send(tx_recv_interactive& recv, const std::vector<std::string>& args) { - auto n = 1; - auto to_addr = std::string(); - if (!args.empty()) { - try { - n = std::stoi(args[0]); - if (n < 1) n = 1; - } catch (...) { - std::cout << "send: expected positive number, got '" << args[0] << "'" << std::endl; - return false; - } - } - if (args.size() >= 2) - to_addr = args[1]; - recv.send(n, to_addr); - std::cout << "send: sent " << n << " message(s) to " << (to_addr.empty() ? "(default address)" : to_addr) << std::endl; - return false; -} -static bool cmd_wait_settled(tx_recv_interactive& recv, const std::vector<std::string>& args) { - auto n = 1; - auto timeout_seconds = 0.0; - if (!args.empty()) { - try { - n = std::stoi(args[0]); - if (n < 0) n = 0; - } catch (...) { - std::cout << "wait_settled: expected non-negative count, got '" << args[0] << "'" << std::endl; - return false; - } - } - if (args.size() >= 2) { - try { - timeout_seconds = std::stof(args[1]); - if (timeout_seconds < 0) timeout_seconds = 0; - } catch (...) { - std::cout << "wait_settled: expected timeout in seconds, got '" << args[1] << "'" << std::endl; - return false; - } - } - recv.wait_settled(n, timeout_seconds); - std::cout << "wait_settled: " << recv.settled_received_count() << " settlement(s)" << std::endl; - return false; -} -static bool cmd_sleep(tx_recv_interactive& recv, const std::vector<std::string>& args) { - if (args.empty()) { - std::cout << "sleep: expected duration in seconds (e.g. sleep 1.5)" << std::endl; - return false; - } - float seconds; - try { - seconds = std::stof(args[0]); - if (seconds < 0) seconds = 0; - } catch (...) { - std::cout << "sleep: expected number of seconds, got '" << args[0] << "'" << std::endl; - return false; - } - recv.sleep(static_cast<double>(seconds)); - return false; -} -static bool cmd_quit(tx_recv_interactive&, const std::vector<std::string>&) { - return true; -} - -static bool cmd_help(tx_recv_interactive&, const std::vector<std::string>&); - -struct command_entry { - const char* name; - const char* description; - command_fn fn; -}; - -// Lexicographically sorted by name for std::lower_bound lookup -static constexpr command_entry COMMAND_TABLE[] = { - {"abort", "Abort the current transaction", cmd_abort}, - {"commit", "Commit the current transaction", cmd_commit}, - {"declare", "Start a transaction", cmd_declare}, - {"fetch", "Receive n messages (optional timeout in seconds)", cmd_fetch}, - {"help", "Show this list of commands", cmd_help}, - {"quit", "Exit the program", cmd_quit}, - {"release", "Release all unsettled deliveries", cmd_release}, - {"send", "Send n messages to queue (optional to address; use <none> to omit)", cmd_send}, - {"sleep", "Sleep for given seconds", cmd_sleep}, - {"unsettled", "List unsettled deliveries", cmd_unsettled}, - {"wait_settled", "Wait for n disposition updates (optional timeout)", cmd_wait_settled}, -}; -static constexpr std::size_t COMMAND_TABLE_SIZE = sizeof(COMMAND_TABLE) / sizeof(COMMAND_TABLE[0]); - -static bool cmd_help(tx_recv_interactive&, const std::vector<std::string>&) { - for (std::size_t i = 0; i < COMMAND_TABLE_SIZE; ++i) { - std::cout << " " << COMMAND_TABLE[i].name << " - " << COMMAND_TABLE[i].description << "\n"; - } - return false; -} - -/// Split a string into words (by whitespace). -static std::vector<std::string> split_args(const std::string& line) { - std::vector<std::string> out; - std::istringstream is(line); - for (std::string word; is >> word;) out.push_back(word); - return out; -} - -/// Parsed command: first element is command name, remaining elements are arguments. -using parsed_command = std::vector<std::string>; - -/// Parse a line into zero or more commands separated by ';'. Each segment is -/// split into words (command name + args). Empty segments are skipped. -/// Returns a sequence that can be iterated to run each command. -static std::vector<parsed_command> parse_command_line(const std::string& line) { - std::vector<parsed_command> commands; - std::istringstream is(line); - for (std::string segment; std::getline(is, segment, ';');) { - auto args = split_args(segment); - if (!args.empty()) - commands.push_back(std::move(args)); - } - return commands; -} - -static const command_entry* find_command(std::string_view name) { - auto it = std::lower_bound(std::begin(COMMAND_TABLE), std::end(COMMAND_TABLE), name, - [](const command_entry& e, std::string_view s) { - return std::string_view(e.name) < s; - }); - if (it != std::end(COMMAND_TABLE) && std::string_view(it->name) == name) - return &*it; - return nullptr; -} - -static bool execute_command(tx_recv_interactive& recv, const command_entry& cmd, const std::vector<std::string>& args) { - auto cmd_args = std::vector<std::string>(args.begin() + 1, args.end()); - return cmd.fn(recv, cmd_args); -} - -#if defined(_WIN32) || defined(_WIN64) -static void block_interrupt() { -} - -static HANDLE g_ctrl_c_event = nullptr; - -static BOOL WINAPI win_ctrl_handler(DWORD dwCtrlType) { - if (dwCtrlType == CTRL_C_EVENT && g_ctrl_c_event != nullptr) - SetEvent(g_ctrl_c_event); - return TRUE; -} - -static void interrupt_thread(tx_recv_interactive* recv) { - HANDLE ev = CreateEventW(nullptr, TRUE, FALSE, nullptr); - if (ev == nullptr) - return; - g_ctrl_c_event = ev; - if (!SetConsoleCtrlHandler(win_ctrl_handler, TRUE)) { - CloseHandle(ev); - return; - } - for (;;) { - if (WaitForSingleObject(ev, INFINITE) == WAIT_OBJECT_0 && recv != nullptr) { - recv->request_interrupt(); - ResetEvent(ev); - } - } -} -#else -static void block_interrupt() { - sigset_t set; - sigemptyset(&set); - sigaddset(&set, SIGINT); - pthread_sigmask(SIG_BLOCK, &set, nullptr); -} - -static void interrupt_thread(tx_recv_interactive* recv) { - sigset_t set; - sigemptyset(&set); - sigaddset(&set, SIGINT); - for (;;) { - int sig = 0; - if (sigwait(&set, &sig) == 0 && sig == SIGINT && recv != nullptr) - recv->request_interrupt(); - } -} -#endif - -int main(int argc, char** argv) { - block_interrupt(); - - auto conn_url = std::string("//127.0.0.1:5672"); - auto addr = std::string("examples"); - auto initial_commands = std::string(); - auto opts = example::options(argc, argv); - - opts.add_value(conn_url, 'u', "url", "connection URL", "URL"); - opts.add_value(addr, 'a', "address", "address to receive messages from", "ADDR"); - opts.add_value(initial_commands, 'c', "commands", "commands to run before interactive mode (e.g. declare; fetch 2; quit)", "COMMANDS"); - - try { - opts.parse(); - - auto recv = tx_recv_interactive(conn_url, addr); - auto container = proton::container(recv); - auto container_thread = std::thread([&container]() { container.run(); }); - - std::thread interrupt_th(interrupt_thread, &recv); - interrupt_th.detach(); - - recv.wait_ready(); - - auto line = initial_commands; - bool quit_requested = false; - while (!quit_requested) { - if (line.empty()) { - std::cout << "> " << std::flush; - if (!std::getline(std::cin, line)) - break; - } - for (const auto& args : parse_command_line(line)) { - auto* cmd = find_command(args[0]); - if (cmd) { - if (execute_command(recv, *cmd, args)) { - quit_requested = true; - break; - } - if (recv.interrupted()) { - std::cout << "[interrupted]" << std::endl; - recv.clear_interrupt(); - } - if (recv.timed_out()) { - std::cout << "[timed out]" << std::endl; - recv.clear_timed_out(); - } - recv.sync_with_connection_thread(); - if (std::string err; recv.take_last_error(err)) { - std::cout << "[error: " << err << "]" <<std::endl; - } - } else { - std::cout << "Unknown command. Type 'help' for a list of commands." << std::endl; - } - } - line.clear(); - } - - recv.quit(); - container_thread.join(); - return 0; - } catch (const example::bad_option& e) { - std::cout << opts << std::endl << e.what() << std::endl; - } catch (const std::exception& e) { - std::cerr << e.what() << std::endl; - } - - return 1; -} diff --git a/tests/cpp/CMakeLists.txt b/tests/cpp/CMakeLists.txt new file mode 100644 index 000000000..955a7aedd --- /dev/null +++ b/tests/cpp/CMakeLists.txt @@ -0,0 +1,34 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Transaction tester: C++ binary plus script-driven CTest. Included from the +# top-level CMakeLists.txt after all bindings are configured. + +if (NOT TARGET qpid-proton-cpp) + return() +endif() + +# Same proton C/C++ headers as cpp/ (tests/cpp is outside that subtree). +include_directories( + "${PROJECT_SOURCE_DIR}/cpp/include" + "${PROJECT_SOURCE_DIR}/c/include") + +add_executable(tx_tester tx_tester.cpp) +target_link_libraries(tx_tester qpid-proton-cpp qpid-proton-core Threads::Threads) +target_include_directories(tx_tester PRIVATE ${PROJECT_SOURCE_DIR}/cpp/examples) diff --git a/tests/cpp/tx_tester.cpp b/tests/cpp/tx_tester.cpp new file mode 100644 index 000000000..df1fd792e --- /dev/null +++ b/tests/cpp/tx_tester.cpp @@ -0,0 +1,1997 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "options.hpp" + +#include <proton/connection.hpp> +#include <proton/container.hpp> +#include <proton/delivery.hpp> +#include <proton/duration.hpp> +#include <proton/error_condition.hpp> +#include <proton/message.hpp> +#include <proton/messaging_handler.hpp> +#include <proton/receiver.hpp> +#include <proton/receiver_options.hpp> +#include <proton/sender.hpp> +#include <proton/sender_options.hpp> +#include <proton/session.hpp> +#include <proton/target_options.hpp> +#include <proton/transport.hpp> +#include <proton/tracker.hpp> +#include <proton/transfer.hpp> +#include <proton/types.hpp> +#include <proton/work_queue.hpp> + +#include <proton/logger.h> +#include <algorithm> +#include <any> +#include <chrono> +#include <condition_variable> +#include <cstddef> +#include <exception> +#include <fstream> +#include <iostream> +#include <iomanip> +#include <iterator> +#include <ostream> +#include <mutex> +#include <optional> +#include <sstream> +#include <string> +#include <string_view> +#include <thread> +#include <variant> +#include <vector> + +#if defined(_WIN32) || defined(_WIN64) +# define WIN32_LEAN_AND_MEAN +# define NOMINMAX +# include <windows.h> +#else +# include <csignal> +#endif + +using namespace std::literals; + +// Sentinel queue name: omit message 'to' address so the peer may reject the message. +inline constexpr auto NO_TO_ADDRESS = "<none>"sv; + +/// After `quit()`, block the main thread up to this long for `on_container_stop` (container `auto_stop` +/// after the connection closes) before `container.stop()` in the destructor. +inline constexpr auto CONTAINER_STOP_GRACE = 1000ms; + +template <class... Ts> +struct overloaded : Ts... { + using Ts::operator()...; +}; +template <class... Ts> +overloaded(Ts...) -> overloaded<Ts...>; + +namespace script { + +/// Shared origin for relative timestamps across deferred output groups. +class logging_epoch { + using steady_clock = std::chrono::steady_clock; + using milliseconds = std::chrono::milliseconds; + + static inline const auto program_epoch_ = steady_clock::now(); + + mutable std::mutex mutex_; + std::decay_t<decltype(program_epoch_)> epoch_{program_epoch_}; + +public: + void reset() { + std::lock_guard<std::mutex> lock(mutex_); + epoch_ = steady_clock::now(); + } + + auto offset_ms() const { + std::lock_guard<std::mutex> lock(mutex_); + return std::chrono::duration_cast<milliseconds>(steady_clock::now() - epoch_); + } +}; + +/// Thread-safe queue of timestamped text lines (one of several deferred output groups sharing a `logging_epoch`). +class logging_buffer { + logging_epoch& epoch_; + mutable std::mutex mutex_; + std::vector<std::string> lines_; + +public: + static std::string timestamp_prefix(std::chrono::milliseconds offset) { + return "+" + std::to_string(offset.count()) + "ms: "; + } + + explicit logging_buffer(logging_epoch& epoch) : epoch_(epoch) {} + + void append_line(std::string line) { + std::lock_guard<std::mutex> lock(mutex_); + lines_.push_back(std::move(line)); + } + + void append_line(std::string_view line) { + append_line(std::string(line)); + } + + void append_timestamped_line(std::string line) { + append_timestamped_line(epoch_.offset_ms(), std::move(line)); + } + + void append_timestamped_line(std::chrono::milliseconds offset, std::string line) { + std::string stamped; + stamped.reserve(16 + line.size()); + stamped += timestamp_prefix(offset); + stamped += std::move(line); + append_line(std::move(stamped)); + } + + void flush(std::ostream& out) { + std::vector<std::string> batch; + { + std::lock_guard<std::mutex> lock(mutex_); + batch.swap(lines_); + } + for (const auto& line : batch) { + out << line << '\n'; + } + } +}; + +struct pending {}; /// Returned only by blocking `wait` subcommands. +struct exit {}; +struct ok {}; +struct status_only{}; /// The commmand did not perform an action. +struct interrupted {}; +struct timed_out {}; +struct proton_err { + std::string what; +}; +/// Script-layer failure: unknown command or variable. +struct script_error { + std::string what; +}; +/// Failed `expect` assertion. +struct assertion_success {}; +struct assertion_failure { + std::string message; +}; + +using outcome = + std::variant<ok, pending, interrupted, timed_out, proton_err, exit, script_error, status_only, assertion_success, assertion_failure>; + +inline std::ostream& operator<<(std::ostream& os, const outcome& c) { + std::visit( + overloaded{ + [&](const pending&) { os << "pending"; }, + [&](const exit&) { os << "exit"; }, + [&](const ok&) { os << "ok"; }, + [&](const status_only&) { os << "status_only"; }, + [&](const interrupted&) { os << "interrupted"; }, + [&](const timed_out&) { os << "timed_out"; }, + [&](const proton_err& e) { os << "proton_error: " << e.what; }, + [&](const script_error& e) { os << "script_error: " << e.what; }, + [&](const assertion_success&) { os << "assertion_success"; }, + [&](const assertion_failure& a) { os << "assertion_failure: " << a.message; }, + }, + c); + return os; +} + +/// True when `run_line`'s outcome should end the REPL (`quit` or failed `expect`). +inline bool should_exit(const outcome& o) noexcept { + return std::holds_alternative<exit>(o) || std::holds_alternative<assertion_failure>(o); +} + +inline bool scripting_only(const script::outcome& o) { + return + std::holds_alternative<script::exit>(o) || + std::holds_alternative<script::script_error>(o) || + std::holds_alternative<script::status_only>(o) || + std::holds_alternative<script::assertion_success>(o) || + std::holds_alternative<script::assertion_failure>(o); +} + +/// Variable name (without `$`) and expanded value for each `$name` token in a command line. +using variable_substitutions = std::vector<std::pair<std::string, std::string>>; + +struct cmd_result_log { + std::chrono::milliseconds offset; + std::string command; + variable_substitutions substitutions; + outcome result; +}; + +class host { + public: + virtual ~host() = default; + + virtual outcome command_hook(class runner& runner, outcome from_command) { return from_command; } + virtual outcome line_hook(class runner& runner, outcome from_line) { return from_line; } + virtual void interrupt_hook(class runner& runner) {} +}; + +struct command_entry { + const char* name; + const char* description; + outcome (*fn)(runner& r, const std::vector<std::string_view>& params); +}; + +/// Same ordering as `std::string_view(a) < std::string_view(b)` for null-terminated names (C++17-safe constexpr). +constexpr int command_name_compare(const char* a, const char* b) noexcept { + while (*a != '\0' && *b != '\0') { + if (*a < *b) return -1; + if (*a > *b) return 1; + ++a; + ++b; + } + if (*a == '\0' && *b == '\0') return 0; + return *a == '\0' ? -1 : 1; +} + +template <std::size_t N> +constexpr bool command_names_sorted(const command_entry (&table)[N]) noexcept { + for (std::size_t i = 1; i < N; ++i) { + if (command_name_compare(table[i - 1].name, table[i].name) >= 0) return false; + } + return true; +} + +/// Parses and dispatches `;`-separated command lines (REPL, `-c` initial script, completion log). +/// The concrete app is held in `std::any` (this example stores `tx_tester&`); `app<App>()` performs `any_cast`. +/// Commands are free functions receiving `script::runner&`. +/// Per-command and per-line policy (variable refresh, container sync, interrupt) lives in `host` hooks. +class runner { + public: + template <typename App> + explicit runner(App& app, logging_buffer& output, host& h): + app_(std::ref(app)), output_(output), host_(h) {} + + /// Forwarded from the host process (e.g. SIGINT handler thread); calls `host::interrupt_hook`. + void request_interrupt() { host_.interrupt_hook(*this); } + + template <typename App> + App& app() { + return std::any_cast<std::reference_wrapper<App>>(app_); + } + + template <typename App> + const App& app() const { + return std::any_cast<std::reference_wrapper<App>>(app_); + } + + /// Runs one input line as a command list. Returns the last command's outcome for this line + /// (empty line → `ok{}`). Use `should_exit` to decide if the REPL should stop (`exit` or `assertion_failure`). + template <int N> + outcome run_line(std::string_view line, const command_entry(&table)[N], logging_epoch& epoch); + + /// Stores a script variable (use as `$name` in commands). Value is substituted verbatim for each token. + void set_variable(std::string name, std::string value); + + /// Sorted variables as `name = quoted(value)` entries. After each pair except the last, appends `between` + /// (default newline). Callers add any leading indent (e.g. `status` uses `" "` + `between` `"\n "`). + std::string format_variables(std::string_view between = "\n") const; + + void log(std::string line) { output_.append_timestamped_line(std::move(line)); } + + private: + void flush_result_log(); + static std::string join_command(const std::vector<std::string_view>& command); + + static std::string format_result(const std::string& command, + const variable_substitutions& substitutions, const outcome& outcome); + + void record_outcome(logging_epoch& epoch, const std::vector<std::string_view>& command, + variable_substitutions substitutions, outcome state); + + outcome execute_command(const std::vector<std::string_view>& command, const command_entry* table, + std::size_t table_size); + + /// Split `text` on any character in `delimiters`; empty segments are dropped. + static std::vector<std::string_view> split_tokens(std::string_view text, std::string_view delimiters); + /// One input line may contain several `;`-separated commands; each command is whitespace-separated tokens. + /// Text from `#` to the end of a segment is ignored; segments that are empty or comment-only are skipped. + static std::vector<std::vector<std::string_view>> commands_from_line(std::string_view line); + + /// Tokens beginning with `$` (rest is the variable name) are replaced with stored values; other tokens unchanged. + /// On success returns `ok{}` (outputs only); on failure returns `script_error` (`out` / `expanded_storage` + /// and `substitutions` may contain entries from tokens processed before the failure). + /// Each successful `$name` substitution appends `{name, value}` to `substitutions` (cleared at entry; on + /// `script_error`, entries from tokens expanded before the failure are left in `substitutions`). + outcome expand_variable_tokens(const std::vector<std::string_view>& tokens, + std::vector<std::string>& expanded_storage, std::vector<std::string_view>& out, + variable_substitutions& substitutions) const; + + std::any app_; + logging_buffer& output_; + host& host_; + std::unordered_map<std::string, std::string> variables_; + mutable std::mutex completion_mutex_; + std::vector<cmd_result_log> completion_log_; +}; + +inline void append_quoted_value(std::string& out, const std::string& value) noexcept { + out.push_back('"'); + for (const char c : value) { + if (c == '"' || c == '\\') out.push_back('\\'); + out.push_back(c); + } + out.push_back('"'); +} + +std::string runner::format_result(const std::string& command, + const variable_substitutions& substitutions, const outcome& outcome) { + std::ostringstream os; + os << "CMD: " << command; + if (!substitutions.empty()) { + os << " ["; + for (std::size_t j = 0; j < substitutions.size(); ++j) { + if (j) os << ", "; + const auto& [name, value] = substitutions[j]; + os << name << "=" << std::quoted(value); + } + os << ']'; + } + os << " -> " << outcome; + return os.str(); +} + +void runner::set_variable(std::string name, std::string value) { + variables_[std::move(name)] = std::move(value); +} + +std::string runner::format_variables(std::string_view between) const { + std::vector<std::pair<std::string, std::string>> pairs(variables_.begin(), variables_.end()); + std::sort(pairs.begin(), pairs.end(), + [](const std::pair<std::string, std::string>& a, const std::pair<std::string, std::string>& b) { + return a.first < b.first; + }); + + std::string out; + auto seps = pairs.size()-1; + for (const auto& [name, value]: pairs) { + out += name; + out += "="; + append_quoted_value(out, value); + if (seps-- > 0) out.append(between); + } + return out; +} + +outcome runner::expand_variable_tokens(const std::vector<std::string_view>& tokens, + std::vector<std::string>& expanded_storage, std::vector<std::string_view>& out, + variable_substitutions& substitutions) const { + expanded_storage.clear(); + out.clear(); + substitutions.clear(); + expanded_storage.reserve(tokens.size()); + out.reserve(tokens.size()); + for (std::string_view tok : tokens) { + if (!tok.empty() && tok.front() == '$') { + const std::string_view name = tok.substr(1); + if (name.empty()) { + return script_error{"empty variable name"}; + } + if (const auto it = variables_.find(std::string(name)); it != variables_.end()) { + substitutions.emplace_back(std::string(name), it->second); + expanded_storage.push_back(it->second); + out.emplace_back(expanded_storage.back()); + } else { + return script_error{"unknown variable '" + std::string(name) + "'"}; + } + } else { + out.push_back(tok); + } + } + return ok{}; +} +std::vector<std::string_view> runner::split_tokens(std::string_view text, std::string_view delimiters) { + std::vector<std::string_view> parts; + std::size_t start = 0; + const char* const base = text.data(); + const auto n = text.size(); + for (std::size_t pos = 0; pos <= n; ++pos) { + if (pos < n && delimiters.find(text[pos]) == std::string_view::npos) continue; + if (pos > start) parts.emplace_back(base + start, pos - start); + start = pos + 1; + } + return parts; +} + +static std::string_view strip_comment(std::string_view text) noexcept { + if (const auto pos = text.find('#'); pos != std::string_view::npos) { + return text.substr(0, pos); + } + return text; +} + +std::vector<std::vector<std::string_view>> runner::commands_from_line(std::string_view line) { + std::vector<std::vector<std::string_view>> command_list; + for (std::string_view raw : split_tokens(line, ";")) { + if (auto tokens = split_tokens(strip_comment(raw), " \n\t"); !tokens.empty()) { + command_list.push_back(std::move(tokens)); + } + } + return command_list; +} + +std::string runner::join_command(const std::vector<std::string_view>& command) { + std::string s; + for (std::size_t i = 0; i < command.size(); ++i) { + if (i) s += ' '; + s.append(command[i].data(), command[i].size()); + } + return s; +} + +const script::command_entry* find_command(std::string_view name, const command_entry* table, std::size_t n) { + auto it = std::lower_bound(table, table + n, name, [](const script::command_entry& e, std::string_view s) { + return std::string_view(e.name) < s; + }); + if (it != table + n && std::string_view(it->name) == name) return it; + return nullptr; +} + +outcome script::runner::execute_command( + const std::vector<std::string_view>& command, const script::command_entry* table, std::size_t table_size) { + if (command.size() < 1) { + return script::script_error{"empty command"}; + } + if (const auto* cmd = find_command(command[0], table, table_size); !cmd) { + return script::script_error{ + "unknown command '" + std::string(command[0]) + "'. Type 'help' for a list of commands."}; + } else { + script::outcome from_command = cmd->fn(*this, std::vector<std::string_view>(command.begin() + 1, command.end())); + return host_.command_hook(*this, from_command); + } +} + +void runner::record_outcome(logging_epoch& epoch, const std::vector<std::string_view>& command, + variable_substitutions substitutions, outcome state) { + const auto offset = epoch.offset_ms(); + auto l = std::lock_guard(completion_mutex_); + completion_log_.push_back( + cmd_result_log{offset, join_command(command), std::move(substitutions), std::move(state)}); +} + +void runner::flush_result_log() { + std::vector<cmd_result_log> rows; + { + auto l = std::lock_guard(completion_mutex_); + if (completion_log_.empty()) { + return; + } + rows = std::move(completion_log_); + completion_log_.clear(); + } + for (const auto& row : rows) { + output_.append_timestamped_line( + row.offset, format_result(row.command, row.substitutions, row.result)); + } +} + +template <int N> +outcome runner::run_line(std::string_view line, const command_entry(&table)[N], logging_epoch& epoch) { + outcome result = ok{}; + const auto command_list = commands_from_line(line); + for (const auto& command : command_list) { + std::vector<std::string> expanded_storage; + std::vector<std::string_view> substituted; + variable_substitutions substitutions; + if (const outcome expand_result = + expand_variable_tokens(command, expanded_storage, substituted, substitutions); + std::holds_alternative<script_error>(expand_result)) { + std::ostringstream os; + os << expand_result; + log(os.str()); + record_outcome(epoch, command, std::move(substitutions), std::move(expand_result)); + continue; + } + const outcome completion = execute_command(substituted, table, N); + record_outcome(epoch, command, std::move(substitutions), completion); + if (should_exit(completion)) { + result = completion; + break; + } + } + flush_result_log(); + return host_.line_hook(*this, result); +} + +} // namespace script + +// Interactive tester for AMQP transactions: declare/commit/abort transactions, receive +// messages via `fetch` / `wait fetched`, `wait incoming` / `wait outgoing` for settlements, `send` then `wait sent` +// until queued sends reach sender_.send; default or a given queue (or omit 'to' +// for rejection testing), wait for declare completion or disposition updates, and +// accept/reject/modify/release unsettled. Type 'help' for commands. +// +// Script variables ($name) are refreshed after each command; use `status` and `expect` to assert them. +// Run against python/examples/broker.py using the build-tree venv Python, e.g.: +// ${BLD}/python/pytest_env/bin/python python/examples/broker.py -t 2 +// (plain `python` lacks the proton binding unless the venv is activated). + +class tx_tester : public proton::messaging_handler { + private: + enum class txn_discharge { none, committed, aborted }; + enum class txn_declare_result { none, ok, error }; + + static const char* txn_discharge_string(txn_discharge d) noexcept { + switch (d) { + case txn_discharge::none: + return "none"; + case txn_discharge::committed: + return "committed"; + case txn_discharge::aborted: + return "aborted"; + } + return "none"; + } + + static const char* txn_declare_result_string(txn_declare_result r) noexcept { + switch (r) { + case txn_declare_result::none: + return "none"; + case txn_declare_result::ok: + return "ok"; + case txn_declare_result::error: + return "error"; + } + return "none"; + } + + static std::string binary_script_string(const proton::binary& id) { + std::ostringstream os; + os << id; + const std::string s = os.str(); + // proton::binary operator<< formats as b"<payload>"; strip wrapper for script variables. + if (s.size() >= 3 && s[0] == 'b' && s[1] == '"' && s.back() == '"') { + return s.substr(2, s.size() - 3); + } + return s; + } + + std::string connection_url_; + std::string receiver_addr_; + + proton::container* container_ = nullptr; + proton::connection connection_; + proton::receiver receiver_; + proton::sender sender_; + proton::session session_; + proton::work_queue* work_queue_ = nullptr; + script::logging_buffer& output_; + + void log(std::string line) { output_.append_timestamped_line(std::move(line)); } + + template <typename... Args> + void log_format(Args&&... args) { + std::ostringstream os; + (os << ... << std::forward<Args>(args)); + log(os.str()); + } + + mutable std::mutex wait_mutex_; + std::condition_variable wait_cv_; + std::string last_error_; + std::string send_to_addr_; + int send_pending_ = 0; + int send_next_id_ = 0; + int fetch_expected_ = 0; + int fetch_received_ = 0; + int messages_received_ = 0; + int settled_expected_ = 0; + int settled_received_ = 0; + int outgoing_expected_ = 0; + int outgoing_received_ = 0; + bool outgoing_done_ = false; + bool session_ready_ = false; + bool timed_out_ = false; + proton::work_handle wait_timeout_handle_ = 0; + bool wait_timeout_scheduled_ = false; + bool interrupt_requested_ = false; + bool fetch_done_ = false; + bool settled_done_ = false; + bool declared_done_ = false; + bool discharged_done_ = false; + bool container_stopped_ = false; + + txn_discharge last_discharge_ = txn_discharge::none; + txn_declare_result last_declare_ = txn_declare_result::none; + proton::binary callback_txn_id_; + int provisional_accept_ = 0; + int provisional_reject_ = 0; + int provisional_release_ = 0; + int tracker_accept_ = 0; + int tracker_reject_ = 0; + int tracker_release_ = 0; + int tracker_settle_ = 0; + + std::mutex sync_mutex_; + std::condition_variable sync_cv_; + bool sync_done_ = false; + + /// Run f() in a try block; on error, record the message for later reporting. + template <typename F> + void catch_any_error(F&& f) { + try { + f(); + } catch (const std::exception& e) { + auto l = std::lock_guard(wait_mutex_); + last_error_ = e.what(); + wait_cv_.notify_all(); + } + } + + /// True when a blocking `wait` should return (timeout, interrupt, or recorded endpoint error). + /// Call only with `wait_mutex_` held. + bool wait_end_requested() const noexcept { + return timed_out_ || interrupt_requested_ || !last_error_.empty(); + } + + /// Format a proton `error_condition` for `take_last_error` / `$error`. + static std::string format_endpoint_error(std::string_view context, const proton::error_condition& ec) { + std::string msg; + msg.reserve(context.size() + 2 + ec.what().size()); + msg.append(context); + msg.append(": "); + msg.append(ec.what()); + return msg; + } + + void do_declare() { + { + auto l = std::lock_guard(wait_mutex_); + declared_done_ = false; + last_discharge_ = txn_discharge::none; + } + catch_any_error([this]() { session_.transaction_declare(); }); + } + + void do_fetch(int n) { + receiver_.add_credit(n); + } + + void do_commit() { + { + auto l = std::lock_guard(wait_mutex_); + discharged_done_ = false; + } + catch_any_error([this]() { session_.transaction_commit(); }); + } + + void do_abort() { + { + auto l = std::lock_guard(wait_mutex_); + discharged_done_ = false; + } + catch_any_error([this]() { session_.transaction_abort(); }); + } + + void do_release() { + for (auto rcv : session_.receivers()) { + for (auto d : rcv.unsettled_deliveries()) { + d.release(); + } + } + } + + void do_accept() { + for (auto rcv : session_.receivers()) { + for (auto d : rcv.unsettled_deliveries()) { + d.accept(); + } + } + } + + void do_reject() { + for (auto rcv : session_.receivers()) { + for (auto d : rcv.unsettled_deliveries()) { + d.reject(); + } + } + } + + void do_modify() { + for (auto rcv : session_.receivers()) { + for (auto d : rcv.unsettled_deliveries()) { + d.modify(); + } + } + } + + int do_unsettled_delivery_count() { + int n = 0; + for (auto rcv : session_.receivers()) { + for (auto d : rcv.unsettled_deliveries()) { + ++n; + } + } + return n; + } + + int do_unsettled_tracker_count() { + int n = 0; + for (auto snd : session_.senders()) { + for (auto t : snd.unsettled_trackers()) { + ++n; + } + } + return n; + } + + void do_incoming() { + log(std::to_string(do_unsettled_delivery_count()) + " unsettled delivery(ies)"); + for (auto rcv : session_.receivers()) { + for (auto d : rcv.unsettled_deliveries()) { + log_format(" ", d.tag()); + } + } + } + + void do_outgoing() { + log(std::to_string(do_unsettled_tracker_count()) + " unsettled tracker(s)"); + for (auto snd : session_.senders()) { + for (auto t : snd.unsettled_trackers()) { + log_format(" ", t.tag()); + } + } + } + + void do_quit() { + connection_.close(); + } + + void do_send(int add_count, std::string to_addr) { + { + auto l = std::lock_guard(wait_mutex_); + interrupt_requested_ = false; + send_pending_ += add_count; + send_to_addr_ = std::move(to_addr); + } + try_send(); + } + + void on_container_start(proton::container& c) override { + container_ = &c; + c.enable_quiescent_callback(true); + c.connect(connection_url_); + } + + void on_container_quiescent(proton::container&) override { + auto l = std::lock_guard(sync_mutex_); + sync_done_ = true; + sync_cv_.notify_all(); + } + + void on_container_stop(proton::container&) override { + { + auto l = std::lock_guard(wait_mutex_); + container_stopped_ = true; + wait_cv_.notify_all(); + } + { + auto l = std::lock_guard(sync_mutex_); + sync_done_ = true; + sync_cv_.notify_all(); + } + } + + void on_connection_open(proton::connection& conn) override { + connection_ = conn; + work_queue_ = &conn.work_queue(); + // credit_window(0) so we control flow via "fetch", explicit acknowledgement + receiver_ = conn.open_receiver(receiver_addr_, proton::receiver_options().credit_window(0).auto_accept(false)); + sender_ = conn.open_sender("", proton::sender_options{}.target(proton::target_options{}.anonymous(true))); + } + + void on_sender_open(proton::sender& s) override { + sender_ = s; + } + + void on_session_open(proton::session& s) override { + session_ = s; + auto l = std::lock_guard(wait_mutex_); + session_ready_ = true; + wait_cv_.notify_all(); + } + + void on_session_transaction_declared(proton::session& s) override { + log_format("transaction declared: ", s.transaction_id()); + auto l = std::lock_guard(wait_mutex_); + callback_txn_id_ = s.transaction_id(); + declared_done_ = true; + last_declare_ = txn_declare_result::ok; + wait_cv_.notify_all(); + } + + void on_session_transaction_committed(proton::session& s) override { + log_format("transaction committed: ", s.transaction_id()); + auto l = std::lock_guard(wait_mutex_); + callback_txn_id_ = s.transaction_id(); + discharged_done_ = true; + last_discharge_ = txn_discharge::committed; + wait_cv_.notify_all(); + } + + void on_session_transaction_aborted(proton::session& s) override { + const auto ec = s.transaction_error(); + log_format("transaction aborted: ", s.transaction_id(), " (", ec.what(), ")"); + auto l = std::lock_guard(wait_mutex_); + callback_txn_id_ = s.transaction_id(); + discharged_done_ = true; + last_discharge_ = txn_discharge::aborted; + if (!ec.empty()) { + last_error_ = format_endpoint_error("transaction", ec); + } + wait_cv_.notify_all(); + } + + void on_session_transaction_error(proton::session& s) override { + const auto ec = s.transaction_error(); + log_format("transaction error: ", ec.what()); + auto l = std::lock_guard(wait_mutex_); + callback_txn_id_ = s.transaction_id(); + declared_done_ = true; + last_declare_ = txn_declare_result::error; + if (!ec.empty()) { + last_error_ = format_endpoint_error("transaction", ec); + } + wait_cv_.notify_all(); + } + + void on_sendable(proton::sender&) override { + try_send(); + } + + void try_send() { + int to_send = 0; + std::string to_addr; + { + auto l = std::lock_guard(wait_mutex_); + to_send = send_pending_; + to_addr = send_to_addr_; + } + int sent = 0; + while (sender_ && sender_.credit() > 0 && to_send > 0) { + proton::message msg; + msg.id(send_next_id_); + if (to_addr != NO_TO_ADDRESS) { + msg.to(to_addr); + } + msg.body(std::map<std::string, int>{{"message", send_next_id_}}); + sender_.send(msg); + ++send_next_id_; + ++sent; + --to_send; + } + if (sent > 0) { + auto l = std::lock_guard(wait_mutex_); + send_pending_ -= sent; + wait_cv_.notify_all(); + } + } + + void on_message(proton::delivery& d, proton::message& msg) override { + log_format(d.tag(), ": ", msg.body()); + auto l = std::lock_guard(wait_mutex_); + ++messages_received_; + if (fetch_expected_ > 0) { + ++fetch_received_; + if (fetch_received_ >= fetch_expected_) { + fetch_done_ = true; + } + } + // Pre-settled deliveries never trigger on_delivery_settle; count them here for `wait incoming` + if (settled_expected_ > 0 && d.settled()) { + ++settled_received_; + if (settled_received_ >= settled_expected_) { + settled_done_ = true; + } + } + wait_cv_.notify_all(); + } + + void on_delivery_settle(proton::delivery&) override { + auto l = std::lock_guard(wait_mutex_); + if (settled_expected_ > 0) { + ++settled_received_; + if (settled_received_ >= settled_expected_) { + settled_done_ = true; + } + } + wait_cv_.notify_all(); + } + + void on_transactional_accept(proton::tracker& t) override { + log_format("tracker: ", t.tag(), " provisional accepted: (txn: ", t.session().transaction_id(), ")"); + auto l = std::lock_guard(wait_mutex_); + ++provisional_accept_; + } + + void on_transactional_reject(proton::tracker& t) override { + log_format("tracker: ", t.tag(), " provisional rejected: (txn: ", t.session().transaction_id(), ")"); + auto l = std::lock_guard(wait_mutex_); + ++provisional_reject_; + } + + void on_transactional_release(proton::tracker& t) override { + log_format("tracker: ", t.tag(), " provisional released: (txn: ", t.session().transaction_id(), ")"); + auto l = std::lock_guard(wait_mutex_); + ++provisional_release_; + } + + void on_tracker_accept(proton::tracker& t) override { + log_format("tracker: ", t.tag(), " accepted: "); + auto l = std::lock_guard(wait_mutex_); + ++tracker_accept_; + } + + void on_tracker_reject(proton::tracker& t) override { + log_format("tracker: ", t.tag(), " rejected: "); + auto l = std::lock_guard(wait_mutex_); + ++tracker_reject_; + } + + void on_tracker_release(proton::tracker& t) override { + log_format("tracker: ", t.tag(), " released: "); + auto l = std::lock_guard(wait_mutex_); + ++tracker_release_; + } + + void on_tracker_settle(proton::tracker& t) override { + log_format("tracker: ", t.tag(), " settled: ", t.state()); + auto l = std::lock_guard(wait_mutex_); + ++tracker_settle_; + if (outgoing_expected_ > 0) { + ++outgoing_received_; + if (outgoing_received_ >= outgoing_expected_) { + outgoing_done_ = true; + } + } + wait_cv_.notify_all(); + } + + void on_session_error(proton::session& s) override { + const auto ec = s.error(); + log_format("Session error: ", ec.what()); + s.connection().close(); + auto l = std::lock_guard(wait_mutex_); + if (!ec.empty()) { + last_error_ = format_endpoint_error("session", ec); + } + wait_cv_.notify_all(); + } + + /// Default `messaging_handler::on_transport_error` throws `proton::error`; record for `$error` instead. + void on_transport_error(proton::transport& t) override { + const auto ec = t.error(); + log_format("transport error: ", ec.what()); + auto l = std::lock_guard(wait_mutex_); + if (!ec.empty()) { + last_error_ = format_endpoint_error("transport", ec); + } + wait_cv_.notify_all(); + } + + void on_connection_error(proton::connection& c) override { + const auto ec = c.error(); + log_format("connection error: ", ec.what()); + c.close(); + auto l = std::lock_guard(wait_mutex_); + if (!ec.empty()) { + last_error_ = format_endpoint_error("connection", ec); + } + wait_cv_.notify_all(); + } + + void on_receiver_error(proton::receiver& l) override { + const auto ec = l.error(); + log_format("receiver error: ", ec.what()); + l.connection().close(); + auto lk = std::lock_guard(wait_mutex_); + if (!ec.empty()) { + last_error_ = format_endpoint_error("receiver", ec); + } + wait_cv_.notify_all(); + } + + void on_sender_error(proton::sender& l) override { + const auto ec = l.error(); + log_format("sender error: ", ec.what()); + l.connection().close(); + auto lk = std::lock_guard(wait_mutex_); + if (!ec.empty()) { + last_error_ = format_endpoint_error("sender", ec); + } + wait_cv_.notify_all(); + } + + /// Run f on the connection thread; block the caller until it finishes. + template <typename F> + void run_on_connection_thread(F&& f) { + if (!work_queue_) { + auto l = std::lock_guard(wait_mutex_); + if (last_error_.empty()) { + last_error_ = "connection not ready"; + } + wait_cv_.notify_all(); + return; + } + std::mutex done_mutex; + std::condition_variable done_cv; + bool done = false; + work_queue_->add([&f, &done_mutex, &done_cv, &done]() { + f(); + auto l = std::lock_guard(done_mutex); + done = true; + done_cv.notify_one(); + }); + std::unique_lock l(done_mutex); + done_cv.wait(l, [&done] { return done; }); + } + + public: + tx_tester(const std::string& url, const std::string& addr, script::logging_buffer& script_output) + : connection_url_(url), receiver_addr_(addr), output_(script_output) {} + + /// Called from the SIGINT-handling thread to wake any current wait. + void request_interrupt() { + auto l = std::lock_guard(wait_mutex_); + interrupt_requested_ = true; + wait_cv_.notify_all(); + } + + /// True if the last wait was exited due to Ctrl-C. + bool interrupted() const { + auto l = std::lock_guard(wait_mutex_); + return interrupt_requested_; + } + + /// Clear the interrupt flag (e.g. after the command loop has handled it). + void clear_interrupt() { + auto l = std::lock_guard(wait_mutex_); + interrupt_requested_ = false; + } + + // Thread-safe: wait until handler is ready to accept commands + void wait_ready() { + auto l = std::unique_lock(wait_mutex_); + interrupt_requested_ = false; + wait_cv_.wait(l, [this] { return session_ready_ || wait_end_requested(); }); + } + + /// Wait until the container thread is quiescent (all queued work and events + /// processed). Use before showing the prompt so the command loop stays in + /// sync with background container-thread activity. + script::outcome sync_with_container_thread(script::outcome from_command) { + { + auto wl = std::lock_guard(wait_mutex_); + if (container_stopped_) { + return script::exit{}; + } + } + std::unique_lock<std::mutex> l(sync_mutex_); + sync_done_ = false; + if (!sync_cv_.wait_for(l, CONTAINER_STOP_GRACE, [this] { return sync_done_; })) { + log("..."); + } + return from_command; + } + + /// Cancel the active wait timeout (`cmd::wait` calls this when the wait finishes). + void cancel_wait_timeout() { + auto l = std::lock_guard(wait_mutex_); + if (!wait_timeout_scheduled_ || !container_) { + return; + } + container_->cancel(wait_timeout_handle_); + wait_timeout_scheduled_ = false; + } + + /// If `seconds` > 0, schedule a timeout on the container after that delay (`cmd::wait` only). + void schedule_wait_timeout(double seconds) { + if (seconds <= 0) { + return; + } + auto l = std::unique_lock(wait_mutex_); + timed_out_ = false; + auto ms = seconds * 1000; + wait_timeout_handle_ = container_->schedule(proton::duration(ms), [this]() { + auto l = std::unique_lock(wait_mutex_); + wait_timeout_scheduled_ = false; + timed_out_ = true; + wait_cv_.notify_all(); + }); + wait_timeout_scheduled_ = true; + } + + /// Block until a container-scheduled wait timeout or interrupt. + void sleep() { + auto l = std::unique_lock(wait_mutex_); + interrupt_requested_ = false; + wait_cv_.wait(l, [this] { return wait_end_requested(); }); + } + + /// Issue receiver credit for n messages (does not block). + void add_credit(int n) { + work_queue_->add([this, n]() { do_fetch(n); }); + } + + /// Wait until n messages are received. Timeout is scheduled by `wait fetched` via + /// `schedule_wait_timeout` before this runs. Does not add credit (use `fetch` first). + void wait_fetched(int n) { + auto l = std::unique_lock(wait_mutex_); + interrupt_requested_ = false; + fetch_expected_ = n; + fetch_received_ = 0; + fetch_done_ = (n <= 0); + wait_cv_.wait(l, [this] { return fetch_done_ || wait_end_requested(); }); + fetch_expected_ = 0; + } + int fetch_received_count() const { + auto l = std::lock_guard(wait_mutex_); + return fetch_received_; + } + /// Return the message-received total and reset it to zero. + int take_messages_received_count() { + auto l = std::lock_guard(wait_mutex_); + const int n = messages_received_; + messages_received_ = 0; + return n; + } + bool timed_out() const { + auto l = std::lock_guard(wait_mutex_); + return timed_out_; + } + + /// Clear the timed-out flag (e.g. after the command loop has handled it). + void clear_timed_out() { + auto l = std::lock_guard(wait_mutex_); + timed_out_ = false; + } + + /// If a handler- or API-recorded error message is pending, assign it to `out`, clear it, and return true. + bool take_last_error(std::string& out) { + auto l = std::lock_guard(wait_mutex_); + if (last_error_.empty()) { + return false; + } + out = std::move(last_error_); + last_error_.clear(); + return true; + } + + /// Count unsettled incoming deliveries (for `wait incoming` default). + int unsettled_incoming_delivery_count() { + int n = 0; + run_on_connection_thread([this, &n]() { n = do_unsettled_delivery_count(); }); + return n; + } + + void wait_incoming(int n) { + auto l = std::unique_lock(wait_mutex_); + settled_expected_ = n; + settled_received_ = 0; + settled_done_ = (n <= 0); + interrupt_requested_ = false; + if (settled_done_) { + settled_done_ = false; + return; + } + wait_cv_.wait(l, [this] { return settled_done_ || wait_end_requested(); }); + settled_done_ = false; + settled_expected_ = 0; + } + int settled_received_count() const { + auto l = std::lock_guard(wait_mutex_); + return settled_received_; + } + + void wait_declared() { + auto l = std::unique_lock(wait_mutex_); + interrupt_requested_ = false; + if (declared_done_) { + declared_done_ = false; + return; + } + wait_cv_.wait(l, [this] { return declared_done_ || wait_end_requested(); }); + declared_done_ = false; + } + + /// Wait until commit or abort completes (`on_session_transaction_committed` or + /// `on_session_transaction_aborted`), or timeout / SIGINT. `discharged_done_` is cleared in + /// `do_commit` / `do_abort` before the operation is sent. + void wait_discharged() { + auto l = std::unique_lock(wait_mutex_); + interrupt_requested_ = false; + if (discharged_done_) { + discharged_done_ = false; + return; + } + wait_cv_.wait(l, [this] { return discharged_done_ || wait_end_requested(); }); + discharged_done_ = false; + } + + // Run synchronously on the connection thread (see `run_on_connection_thread`). + void declare() { + run_on_connection_thread([this]() { do_declare(); }); + } + void commit() { + run_on_connection_thread([this]() { do_commit(); }); + } + void abort() { + run_on_connection_thread([this]() { do_abort(); }); + } + void release() { + run_on_connection_thread([this]() { do_release(); }); + } + void accept() { + run_on_connection_thread([this]() { do_accept(); }); + } + void reject() { + run_on_connection_thread([this]() { do_reject(); }); + } + void modify() { + run_on_connection_thread([this]() { do_modify(); }); + } + /// Queue `n` more messages for sending (subject to link credit). Merges with any pending count on the connection + /// thread (`send_pending_ += n`, `send_to_addr_` overwritten per batch — best-effort for overlapping sends). + void queue_send(int n, const std::string& to_addr) { + std::string addr = to_addr.empty() ? receiver_addr_ : to_addr; + work_queue_->add([this, n, addr = std::move(addr)]() mutable { do_send(n, std::move(addr)); }); + } + + int unsettled_outgoing_tracker_count() { + int n = 0; + run_on_connection_thread([this, &n]() { n = do_unsettled_tracker_count(); }); + return n; + } + + /// Wait until all queued `send` work has been passed to `sender_.send` (see `try_send`), + /// or timeout / interrupt. Not related to tracker settlement. + void wait_queued_send_complete() { + auto l = std::unique_lock(wait_mutex_); + interrupt_requested_ = false; + wait_cv_.wait(l, [this] { return send_pending_ == 0 || wait_end_requested(); }); + } + + /// Wait for `n` outgoing tracker settlements (`on_tracker_settle`), or timeout / interrupt. + void wait_outgoing(int n) { + auto l = std::unique_lock(wait_mutex_); + outgoing_expected_ = n; + outgoing_received_ = 0; + outgoing_done_ = (n <= 0); + interrupt_requested_ = false; + if (outgoing_done_) { + outgoing_done_ = false; + outgoing_expected_ = 0; + return; + } + wait_cv_.wait(l, [this] { return outgoing_done_ || wait_end_requested(); }); + outgoing_done_ = false; + outgoing_expected_ = 0; + } + + int outgoing_received_count() const { + auto l = std::lock_guard(wait_mutex_); + return outgoing_received_; + } + + int send_pending_count() const { + auto l = std::lock_guard(wait_mutex_); + return send_pending_; + } + + int messages_received_count() const { + auto l = std::lock_guard(wait_mutex_); + return messages_received_; + } + + bool transaction_is_declared() { + bool declared = false; + run_on_connection_thread([this, &declared]() { declared = session_.transaction_is_declared(); }); + return declared; + } + + proton::binary transaction_id() { + proton::binary id; + run_on_connection_thread([this, &id]() { id = session_.transaction_id(); }); + return id; + } + + /// Reset lifecycle flags and disposition counters (script variables refreshed separately). + void reset_script_status() { + auto l = std::lock_guard(wait_mutex_); + last_discharge_ = txn_discharge::none; + last_declare_ = txn_declare_result::none; + callback_txn_id_ = proton::binary{}; + provisional_accept_ = 0; + provisional_reject_ = 0; + provisional_release_ = 0; + tracker_accept_ = 0; + tracker_reject_ = 0; + tracker_release_ = 0; + tracker_settle_ = 0; + } + + /// Publish handler state as script variables (use as `$name` in commands and `expect`). + void publish_script_variables(script::runner& runner) { + txn_discharge last_discharge = txn_discharge::none; + txn_declare_result last_declare = txn_declare_result::none; + proton::binary callback_txn_id; + int messages = 0; + int fetch = 0; + int settled = 0; + int outgoing = 0; + int send_pending = 0; + int provisional_accept = 0; + int provisional_reject = 0; + int provisional_release = 0; + int tracker_accept = 0; + int tracker_reject = 0; + int tracker_release = 0; + int tracker_settle = 0; + { + auto l = std::lock_guard(wait_mutex_); + last_discharge = last_discharge_; + last_declare = last_declare_; + callback_txn_id = callback_txn_id_; + messages = messages_received_; + fetch = fetch_received_; + settled = settled_received_; + outgoing = outgoing_received_; + send_pending = send_pending_; + provisional_accept = provisional_accept_; + provisional_reject = provisional_reject_; + provisional_release = provisional_release_; + tracker_accept = tracker_accept_; + tracker_reject = tracker_reject_; + tracker_release = tracker_release_; + tracker_settle = tracker_settle_; + } + const int unsettled_in = unsettled_incoming_delivery_count(); + const int unsettled_out = unsettled_outgoing_tracker_count(); + const bool txn_declared = transaction_is_declared(); + const proton::binary txn_id = transaction_id(); + + runner.set_variable("last_discharge", txn_discharge_string(last_discharge)); + runner.set_variable("last_declare", txn_declare_result_string(last_declare)); + runner.set_variable("txn_id", binary_script_string(txn_id)); + runner.set_variable("callback_txn_id", binary_script_string(callback_txn_id)); + runner.set_variable("txn_declared", txn_declared ? "true" : "false"); + runner.set_variable("messages", std::to_string(messages)); + runner.set_variable("fetch", std::to_string(fetch)); + runner.set_variable("settled", std::to_string(settled)); + runner.set_variable("outgoing", std::to_string(outgoing)); + runner.set_variable("unsettled_in", std::to_string(unsettled_in)); + runner.set_variable("unsettled_out", std::to_string(unsettled_out)); + runner.set_variable("send_pending", std::to_string(send_pending)); + runner.set_variable("provisional_accept", std::to_string(provisional_accept)); + runner.set_variable("provisional_reject", std::to_string(provisional_reject)); + runner.set_variable("provisional_release", std::to_string(provisional_release)); + runner.set_variable("tracker_accept", std::to_string(tracker_accept)); + runner.set_variable("tracker_reject", std::to_string(tracker_reject)); + runner.set_variable("tracker_release", std::to_string(tracker_release)); + runner.set_variable("tracker_settle", std::to_string(tracker_settle)); + } + + void incoming() { + run_on_connection_thread([this]() { do_incoming(); }); + } + void outgoing() { + run_on_connection_thread([this]() { do_outgoing(); }); + } + void quit() { + { + auto l = std::lock_guard(wait_mutex_); + container_stopped_ = false; + } + work_queue_->add([this]() { do_quit(); }); + } + + /// After `quit()`, wait up to `CONTAINER_STOP_GRACE` for `on_container_stop` (typically once + /// `auto_stop` stops the container after the connection closes). If the deadline passes, return anyway + /// so `container.stop()` can run. + void wait_container_stop() { + std::unique_lock<std::mutex> lk(wait_mutex_); + if (container_stopped_) { + return; + } + if (!wait_cv_.wait_for(lk, CONTAINER_STOP_GRACE, [this] { return container_stopped_; })) { + std::cerr << "quit: waited " << CONTAINER_STOP_GRACE.count() + << "ms for container stop; on_container_stop did not arrive, continuing shutdown\n"; + } + } +}; + +namespace cmd { + +std::optional<int> parse_int(std::string_view s) noexcept { + try { + return std::stoi(std::string(s)); + } catch (...) { + return std::nullopt; + } +} + +std::optional<double> try_parse_nonneg_double_token(std::string_view s) noexcept { + try { + std::size_t idx = 0; + if (const double v = std::stod(std::string(s), &idx); idx == s.size()) { + return std::max(0.0, v); + } + return std::nullopt; + } catch (...) { + return std::nullopt; + } +} + +std::optional<std::string_view> arg(const std::vector<std::string_view>& a, size_t i) { + if (i >= a.size()) return std::nullopt; + return std::string_view(a[i]); +} + +bool expect_int(script::runner& runner, const char* cmd, const char* name, + std::optional<std::string_view> text, int& out, int if_absent, int at_least) { + if (!text) { + out = if_absent; + return true; + } + if (auto n = parse_int(*text)) { + out = std::max(at_least, *n); + return true; + } + std::ostringstream os; + os << cmd << ": invalid " << name << " '" << *text << "'"; + runner.log(os.str()); + return false; +} + +script::outcome abort(script::runner& runner, const std::vector<std::string_view>&) { + runner.app<tx_tester>().abort(); + return script::ok{}; +} +script::outcome accept(script::runner& runner, const std::vector<std::string_view>&) { + runner.app<tx_tester>().accept(); + return script::ok{}; +} +script::outcome commit(script::runner& runner, const std::vector<std::string_view>&) { + runner.app<tx_tester>().commit(); + return script::ok{}; +} +script::outcome declare(script::runner& runner, const std::vector<std::string_view>&) { + runner.app<tx_tester>().declare(); + return script::ok{}; +} +script::outcome expect(script::runner& runner, const std::vector<std::string_view>& params) { + constexpr auto k_not = "@not"sv; + constexpr auto k_empty = "@empty"sv; + + std::size_t i = 0; + const bool neg = !params.empty() && params[0] == k_not; + if (neg) ++i; + + const std::size_t n = params.size() - i; + bool result = false; + + // No more arguments - treat as true + if (n==0) { + result = true; + } else if (const auto head = params[i]; !head.empty() && head.front() == '@') { + // Any token beginning with `@` here is a predicate name; + if (head == k_empty) { + if (params.size() < i + 2) { + return script::script_error{"expect: @empty requires one argument"}; + } + const std::string_view v = params[i + 1]; + result = v.empty(); + } else { + const std::string err = "unknown expect predicate " + std::string(head); + return script::script_error{err}; + } + } else if (n==1) { + // Treat a single argument as a boolean value. + result = params[i] == "true"; + } else if (n >= 2) { + result = (params[i] == params[i + 1]); + } + if (result == !neg) return script::assertion_success{}; // ok if result is opposite of neg - only status no action + return script::assertion_failure{std::string("[") + runner.format_variables(", ") + "]"}; +} +script::outcome incoming(script::runner& runner, const std::vector<std::string_view>&) { + runner.app<tx_tester>().incoming(); + return script::ok{}; +} +script::outcome modify(script::runner& runner, const std::vector<std::string_view>&) { + runner.app<tx_tester>().modify(); + return script::ok{}; +} +script::outcome received(script::runner& runner, const std::vector<std::string_view>&) { + runner.log("received: " + std::to_string(runner.app<tx_tester>().take_messages_received_count()) + + " message(s)"); + return script::status_only{}; +} +script::outcome quit(script::runner&, const std::vector<std::string_view>&) { + return script::exit{}; +} +script::outcome reject(script::runner& runner, const std::vector<std::string_view>&) { + runner.app<tx_tester>().reject(); + return script::ok{}; +} +script::outcome release(script::runner& runner, const std::vector<std::string_view>&) { + runner.app<tx_tester>().release(); + return script::ok{}; +} +script::outcome reset_status(script::runner& runner, const std::vector<std::string_view>&) { + runner.set_variable("error", ""); + runner.set_variable("interrupted", "false"); + runner.set_variable("timedout", "false"); + runner.app<tx_tester>().reset_script_status(); + runner.app<tx_tester>().publish_script_variables(runner); + return script::status_only{}; +} +script::outcome outgoing(script::runner& runner, const std::vector<std::string_view>&) { + runner.app<tx_tester>().outgoing(); + return script::ok{}; +} +script::outcome send(script::runner& runner, const std::vector<std::string_view>& params) { + int n = 0; + if (!expect_int(runner, "send", "message count (positive integer)", arg(params, 0), n, 1, 1)) { + return script::ok{}; + } + std::string to_addr; + if (params.size() >= 2) to_addr = std::string{params[1]}; + runner.app<tx_tester>().queue_send(n, to_addr); + runner.log("send: queued " + std::to_string(n) + " message(s) to " + + (to_addr.empty() ? "(default address)" : to_addr)); + return script::ok{}; +} + +script::outcome status(script::runner& runner, const std::vector<std::string_view>&) { + runner.log(std::string(" ") + runner.format_variables("\n ")); + return script::status_only{}; +} + +script::outcome wait_sleep(script::runner& runner, const std::vector<std::string_view>&) { + runner.app<tx_tester>().sleep(); + return script::pending{}; +} +script::outcome wait_declared(script::runner& runner, const std::vector<std::string_view>&) { + runner.app<tx_tester>().wait_declared(); + return script::pending{}; +} +script::outcome wait_discharged(script::runner& runner, const std::vector<std::string_view>&) { + runner.app<tx_tester>().wait_discharged(); + return script::pending{}; +} +script::outcome wait_sent(script::runner& runner, const std::vector<std::string_view>&) { + runner.app<tx_tester>().wait_queued_send_complete(); + return script::pending{}; +} +script::outcome fetch(script::runner& runner, const std::vector<std::string_view>& params) { + int n = 0; + if (!expect_int(runner, "fetch", "credit count (positive integer)", arg(params, 0), n, 1, 1)) { + return script::ok{}; + } + runner.app<tx_tester>().add_credit(n); + runner.log("fetch: added credit for " + std::to_string(n) + " message(s)"); + return script::ok{}; +} +script::outcome wait_fetched(script::runner& runner, const std::vector<std::string_view>& params) { + int n = 0; + if (!expect_int(runner, "wait fetched", "message count (positive integer)", arg(params, 0), n, 1, 1)) { + return script::ok{}; + } + runner.app<tx_tester>().wait_fetched(n); + runner.log("wait fetched: received " + + std::to_string(runner.app<tx_tester>().fetch_received_count()) + " message(s)"); + return script::pending{}; +} +script::outcome wait_incoming(script::runner& runner, const std::vector<std::string_view>& params) { + int n = 0; + if (!arg(params, 0)) { + n = runner.app<tx_tester>().unsettled_incoming_delivery_count(); + } else { + if (!expect_int(runner, "wait incoming", "settlement count (non-negative integer)", arg(params, 0), n, 0, 0)) { + return script::ok{}; + } + } + runner.app<tx_tester>().wait_incoming(n); + runner.log("wait incoming: " + + std::to_string(runner.app<tx_tester>().settled_received_count()) + " settlement(s)"); + return script::pending{}; +} +script::outcome wait_outgoing(script::runner& runner, const std::vector<std::string_view>& params) { + int n = 0; + if (!arg(params, 0)) { + n = runner.app<tx_tester>().unsettled_outgoing_tracker_count(); + } else { + if (!expect_int(runner, "wait outgoing", "settlement count (non-negative integer)", arg(params, 0), n, 0, 0)) { + return script::ok{}; + } + } + runner.app<tx_tester>().wait_outgoing(n); + runner.log("wait outgoing: " + + std::to_string(runner.app<tx_tester>().outgoing_received_count()) + " tracker settlement(s)"); + return script::pending{}; +} + +script::outcome wait_help(script::runner& runner, const std::vector<std::string_view>&); + +inline constexpr script::command_entry wait_command_table[] = { + {"declared", "Wait for declare", wait_declared}, + {"discharged", "Wait for commit or abort to finish", wait_discharged}, + {"fetched", "Wait for n messages to arrive (default: 1)", wait_fetched}, + {"help", "Show wait subcommands", wait_help}, + {"incoming", "Wait for n incoming delivery settlements (default: unsettled incoming count)", wait_incoming}, + {"outgoing", "Wait for n outgoing tracker settlements (default: unsettled outgoing count)", wait_outgoing}, + {"sent", "Wait for queued send work to be sent (default: link credit)", wait_sent}, +}; +static_assert(script::command_names_sorted(wait_command_table), + "wait_command_table must be strictly sorted by name (find_command uses std::lower_bound)"); + +script::outcome wait_help(script::runner& runner, const std::vector<std::string_view>&) { + runner.log(std::string( + "wait [timeout] <subcommand> ... — optional leading timeout (seconds) applies to that wait;\n" + "bare `wait` or `wait <timeout>` sleeps until timeout or interrupt;")); + constexpr std::size_t n = sizeof(wait_command_table) / sizeof(wait_command_table[0]); + for (std::size_t i = 0; i < n; ++i) { + runner.log(" " + std::string(wait_command_table[i].name) + " - " + wait_command_table[i].description); + } + return script::status_only{}; +} + +/// `wait` sub-dispatch and outcome folding (interrupt / timeout / error vs `Pending` from blocking subcommands). +script::outcome wait(script::runner& runner, const std::vector<std::string_view>& params) { + auto pit = params.begin(); + double timeout_seconds = 0.0; + if (pit != params.end()) { + if (auto t = try_parse_nonneg_double_token(*pit)) { + timeout_seconds = *t; + ++pit; + } + } + + script::outcome (*fn)(script::runner&, const std::vector<std::string_view>&) = nullptr; + if (pit == params.end()) { + fn = wait_sleep; + } else if (const auto* wcmd = + script::find_command(*pit, wait_command_table, std::size(wait_command_table)); + wcmd) { + fn = wcmd->fn; + ++pit; + } else { + std::string expl = "wait: unknown subcommand ("; + const char* sep = ""; + constexpr std::size_t wn = sizeof(wait_command_table) / sizeof(wait_command_table[0]); + for (std::size_t i = 0; i < wn; ++i) { + expl += sep; + expl += wait_command_table[i].name; + sep = ", "; + } + expl += ')'; + return script::script_error{std::move(expl)}; + } + + runner.app<tx_tester>().schedule_wait_timeout(timeout_seconds); + auto result = fn(runner, std::vector<std::string_view>{pit, params.end()}); + runner.app<tx_tester>().cancel_wait_timeout(); + return result; +} + +script::outcome help(script::runner&, const std::vector<std::string_view>&); + +inline constexpr script::command_entry command_table[] = { + {"abort", "Abort the current transaction", abort}, + {"accept", "Accept all unsettled deliveries", accept}, + {"commit", "Commit the current transaction", commit}, + {"declare", "Start a transaction (not again until commit or abort)", declare}, + {"expect", "Script assertion; failure exits the REPL", expect}, + {"fetch", "Add receiver credit for n messages (default 1)", fetch}, + {"help", "Show this list of commands", help}, + {"incoming", "List unsettled incoming deliveries", incoming}, + {"modify", "Modify all unsettled deliveries", modify}, + {"outgoing", "List unsettled outgoing trackers", outgoing}, + {"quit", "Exit the program", quit}, + {"received", "Print messages received since last 'received', then reset counter", received}, + {"reject", "Reject all unsettled deliveries", reject}, + {"release", "Release all unsettled deliveries", release}, + {"reset_status", "Clear $error, $interrupted, $timedout; reset txn/disposition script variables", reset_status}, + {"send", "Queue n messages to send (optional to address; use <none> to omit)", send}, + {"status", "Show script variable names and values", status}, + {"wait", + "Block until an event: wait [timeout] declared | discharged | fetched n | incoming [n] | outgoing [n] | sent | help", + wait}, +}; +static_assert(script::command_names_sorted(command_table), + "command_table must be strictly sorted by name (find_command uses std::lower_bound)"); + +script::outcome help(script::runner& runner, const std::vector<std::string_view>&) { + constexpr std::size_t n = sizeof(command_table) / sizeof(command_table[0]); + for (std::size_t i = 0; i < n; ++i) { + runner.log(" " + std::string(command_table[i].name) + " - " + command_table[i].description); + } + runner.log( + "Script variables (refreshed after each command; use with expect and status):\n" + " $error $interrupted $timedout — wait outcome\n" + " $txn_declared $last_declare $last_discharge $txn_id $callback_txn_id — transaction lifecycle\n" + " $messages $fetch $settled $outgoing $unsettled_in $unsettled_out $send_pending — flow\n" + " $provisional_accept $provisional_reject $provisional_release — transactional tracker events\n" + " $tracker_accept $tracker_reject $tracker_release $tracker_settle — final tracker events"); + return script::status_only{}; +} + +class tx_tester_host : public script::host { + public: + explicit tx_tester_host(tx_tester& app, bool interactive): app_(app), interactive_(interactive) {} + + script::outcome command_hook(script::runner& runner, script::outcome from_command) override { + // On entry the only possible outcomes from commands are currently: + // ok{} which is the usual completion status of non waiting commands, however there could have been an error that occured + // so that should be translated to proton_err{}. + // pending{} which is the outcome of waiting commands, which could be interrupted{}, timed_out{} or proton_err{}, or ok{} + // if the wait was successful. + // exit{} which currently only comes from the exit command and tells the loop to exit. + // assertion_failure{} which currently only comes from the expect command and is also currently treated as a main loop exit. + // + // status_only{} which comes from commands that take no action and so there is no need to sync with the event loop. + // script_err{} can occur if the command is invalid in some way or a variable doesn't exist. + // + // Currently proton_err{}, interrupted{}, timed_out{} are never returned directly from a command and must be synthesised here. + // + + if (scripting_only(from_command)) { + return from_command; + } + + needs_sync = true; + + std::string err; + const bool has_err = app_.take_last_error(err); + + app_.publish_script_variables(runner); + + if (has_err) { + runner.set_variable("error", err); + return script::proton_err{std::move(err)}; + } + + // At this point only ok{} and pending{} are left as possibilities. + if (!std::holds_alternative<script::pending>(from_command)) { + return from_command; + } + + const bool interrupted = app_.interrupted(); + const bool timed_out = app_.timed_out(); + + app_.clear_interrupt(); + app_.clear_timed_out(); + + if (interrupted) { + runner.set_variable("interrupted", "true"); + return script::interrupted{}; + } + if (timed_out) { + runner.set_variable("timedout", "true"); + return script::timed_out{}; + } + return script::ok{}; + } + + script::outcome line_hook(script::runner&, script::outcome from_line) override { + if (interactive_ && needs_sync) { + auto result = app_.sync_with_container_thread(from_line); + needs_sync = false; + return result; + } + return from_line; + } + + void interrupt_hook(script::runner&) override { app_.request_interrupt(); } + + private: + tx_tester& app_; + bool interactive_; + bool needs_sync = false; +}; + +} // namespace cmd + +#if defined(_WIN32) || defined(_WIN64) +static void block_interrupt() { +} + +static HANDLE g_ctrl_c_event = nullptr; + +static BOOL WINAPI win_ctrl_handler(DWORD dwCtrlType) { + if (dwCtrlType == CTRL_C_EVENT && g_ctrl_c_event != nullptr) { + SetEvent(g_ctrl_c_event); + } + return TRUE; +} + +static void interrupt_thread(script::runner& runner) { + HANDLE ev = CreateEventW(nullptr, TRUE, FALSE, nullptr); + if (ev == nullptr) { + return; + } + g_ctrl_c_event = ev; + if (!SetConsoleCtrlHandler(win_ctrl_handler, TRUE)) { + CloseHandle(ev); + return; + } + for (;;) { + if (WaitForSingleObject(ev, INFINITE) == WAIT_OBJECT_0) { + runner.request_interrupt(); + ResetEvent(ev); + } + } +} +#else +static void block_interrupt() { + sigset_t set; + sigemptyset(&set); + sigaddset(&set, SIGINT); + pthread_sigmask(SIG_BLOCK, &set, nullptr); +} + +static void interrupt_thread(script::runner& runner) { + sigset_t set; + sigemptyset(&set); + sigaddset(&set, SIGINT); + for (;;) { + int sig = 0; + if (sigwait(&set, &sig) == 0 && sig == SIGINT) { + runner.request_interrupt(); + } + } +} +#endif + +static void library_log_sink(intptr_t context, pn_log_subsystem_t sub, pn_log_level_t sev, + const char* message) { + script::logging_buffer* output_buffer = reinterpret_cast<script::logging_buffer*>(context); + std::ostringstream line; + line << '[' << pn_logger_subsystem_name(sub) << "]:" + << pn_logger_level_name(sev) << ": " << (message ? message : ""); + output_buffer->append_timestamped_line(line.str()); +} + +void install_log_sink(script::logging_buffer& output_buffer) { + pn_logger_set_log_sink(pn_default_logger(), library_log_sink, reinterpret_cast<intptr_t>(&output_buffer)); +} + +std::string interactive_prompt() { + std::cout << "> " << std::flush; + std::string line; + std::getline(std::cin, line); + return line; +} + +/// Runs `container.run()` on a worker thread. Destructor calls `container.stop()` (no-op if already +/// stopping or if `run()` has already returned) then `join()`. For orderly shutdown, `main` should call +/// `recv.quit()`, then `recv.wait_container_stop_after_quit()`, so `auto_stop` can end `run()` before `stop()`. +struct proton_container_runner { + proton::container& container; + std::thread thread; + + proton_container_runner(const proton_container_runner&) = delete; + proton_container_runner& operator=(const proton_container_runner&) = delete; + proton_container_runner(proton_container_runner&&) = delete; + proton_container_runner& operator=(proton_container_runner&&) = delete; + + explicit proton_container_runner(proton::container& c): + container(c), + thread([this]() { container.run(); }) + {} + + ~proton_container_runner() { + container.stop(); + if (thread.joinable()) + thread.join(); + } +}; + +int main(int argc, char** argv) { + block_interrupt(); + + auto conn_url = std::string("//127.0.0.1:5672"); + auto addr = std::string("examples"); + auto initial_command_list = std::string(); + auto command_file = std::string(); + auto opts = example::options(argc, argv); + + opts.add_value(conn_url, 'u', "url", "connection URL", "URL"); + opts.add_value(addr, 'a', "address", "address to receive messages from", "ADDR"); + opts.add_value(initial_command_list, 'c', "commands", + "command list to run before interactive mode (e.g. declare; fetch 2; wait fetched 2; quit)", "COMMANDS"); + opts.add_value(command_file, 'f', "file", "read commands from FILE ('-' for stdin; no interactive mode)", "FILE"); + + try { + opts.parse(); + + std::ifstream command_file_stream; + std::istream* command_input = nullptr; + if (!command_file.empty()) { + if (command_file == "-") { + command_input = &std::cin; + } else { + command_file_stream.open(command_file); + if (!command_file_stream) { + throw std::runtime_error("cannot open command file '" + command_file + "'"); + } + command_input = &command_file_stream; + } + } + + script::logging_epoch epoch; + script::logging_buffer script_output{epoch}; + + install_log_sink(script_output); + + auto recv = tx_tester(conn_url, addr, script_output); + auto container = proton::container(recv); + proton_container_runner runner(container); + + cmd::tx_tester_host tester_host{recv, command_input == nullptr}; + script::runner script_runner{recv, script_output, tester_host}; + std::thread interrupt_th(interrupt_thread, std::ref(script_runner)); + interrupt_th.detach(); + + recv.wait_ready(); + if (std::string startup_err; recv.take_last_error(startup_err)) { + std::cerr << startup_err << std::endl; + return 1; + } + + cmd::reset_status(script_runner, {}); // Set up initial status values + script::outcome outcome; + auto line = initial_command_list; + for (;;) { + outcome = script_runner.run_line(line, cmd::command_table, epoch); + script_output.flush(std::cout); + if (script::should_exit(outcome)) break; + if (command_input) { + if (!std::getline(*command_input, line)) + break; + } else { + line = interactive_prompt(); + if (!std::cin) break; + epoch.reset(); + } + } + + recv.quit(); + recv.wait_container_stop(); + script_output.flush(std::cout); + return std::holds_alternative<script::assertion_failure>(outcome) ? 1 : 0; + } catch (const example::bad_option& e) { + std::cout << opts << std::endl << e.what() << std::endl; + } catch (const std::bad_any_cast& e) { + std::cerr << "Internal error: script runner app type mismatch (bad any_cast). " << e.what() << std::endl; + } catch (const std::exception& e) { + std::cerr << e.what() << std::endl; + } catch (...) { + std::cerr << "Unknown exception" << std::endl; + } + + return 1; +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
