This is an automated email from the ASF dual-hosted git repository. astitcher pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
commit d90c70637fed77c18e62fb12b84613d6ffc865df Author: Andrew Stitcher <[email protected]> AuthorDate: Fri Mar 6 17:10:56 2026 -0500 PROTON-1442: [C++] Interactive transaction tester You can interactively declare, commit, abort transactions; fetch/send messages; list pending unsettled messages; release unsettled messages This needs to be run against a broker that supports transactions Probably shouldn't be in examples as it's really a tester This code was written with the assistance of Cursor. --- cpp/examples/CMakeLists.txt | 1 + cpp/examples/tx_interactive.cpp | 774 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 775 insertions(+) diff --git a/cpp/examples/CMakeLists.txt b/cpp/examples/CMakeLists.txt index 685a5ed36..25b414cd6 100644 --- a/cpp/examples/CMakeLists.txt +++ b/cpp/examples/CMakeLists.txt @@ -61,6 +61,7 @@ foreach(example service_bus multithreaded_client multithreaded_client_flow_control + tx_interactive tx_send tx_recv) add_executable(${example} ${example}.cpp) diff --git a/cpp/examples/tx_interactive.cpp b/cpp/examples/tx_interactive.cpp new file mode 100644 index 000000000..d48b21585 --- /dev/null +++ b/cpp/examples/tx_interactive.cpp @@ -0,0 +1,774 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "options.hpp" + +#include <proton/connection.hpp> +#include <proton/container.hpp> +#include <proton/delivery.hpp> +#include <proton/duration.hpp> +#include <proton/message.hpp> +#include <proton/messaging_handler.hpp> +#include <proton/receiver.hpp> +#include <proton/receiver_options.hpp> +#include <proton/sender.hpp> +#include <proton/sender_options.hpp> +#include <proton/session.hpp> +#include <proton/target_options.hpp> +#include <proton/tracker.hpp> +#include <proton/transfer.hpp> +#include <proton/types.hpp> +#include <proton/work_queue.hpp> + +#include <algorithm> +#include <condition_variable> +#include <cstddef> +#include <exception> +#include <iostream> +#include <mutex> +#include <sstream> +#include <string> +#include <string_view> +#include <thread> +#include <vector> + +#if defined(_WIN32) || defined(_WIN64) +# include <windows.h> +#else +# include <csignal> +#endif + +// Sentinel queue name: omit message 'to' address so the peer may reject the message. +constexpr std::string_view NO_TO_ADDRESS = "<none>"; + +// Interactive tester for AMQP transactions: declare/commit/abort transactions, receive +// messages (with optional timeout), send to the default or a given queue (or omit 'to' +// for rejection testing), wait for disposition updates, and release unsettled. Type +// 'help' for commands. Received messages are accepted on receipt. + +class tx_recv_interactive : public proton::messaging_handler { + private: + std::string conn_url_; + std::string addr_; + + proton::connection connection_; + proton::receiver receiver_; + proton::sender sender_; + proton::session session_; + proton::work_queue* work_queue_ = nullptr; + + int send_pending_ = 0; + int send_next_id_ = 0; + std::string send_to_addr_; + + mutable std::mutex wait_mutex_; + std::condition_variable wait_cv_; + bool ready_ = false; + bool sleep_done_ = true; + bool timed_out_ = false; + int fetch_expected_ = 0; + int fetch_received_ = 0; + bool fetch_done_ = false; + int settled_expected_ = 0; + int settled_received_ = 0; + bool settled_done_ = false; + bool interrupt_requested_ = false; + std::string last_error_; + + /// Run f() in a try block; on error, record the message for later reporting. + template <typename F> + void catch_any_error(F&& f) { + try { + f(); + } catch (const std::exception& e) { + auto l = std::lock_guard(wait_mutex_); + last_error_ = e.what(); + } + } + + std::mutex sync_mutex_; + std::condition_variable sync_cv_; + bool sync_done_ = false; + + void timeout_fired() { + auto l = std::lock_guard(wait_mutex_); + timed_out_ = true; + wait_cv_.notify_all(); + } + + void do_declare() { + catch_any_error([this]() { session_.transaction_declare(); }); + } + + void do_fetch(int n) { + receiver_.add_credit(n); + } + + void do_commit() { + catch_any_error([this]() { session_.transaction_commit(); }); + } + + void do_abort() { + catch_any_error([this]() { session_.transaction_abort(); }); + } + + void do_release() { + for (auto rcv : session_.receivers()) { + for (auto d : rcv.unsettled_deliveries()) { + d.release(); + } + } + } + + void do_quit() { + connection_.close(); + } + + public: + tx_recv_interactive(const std::string& url, const std::string& addr) + : conn_url_(url), addr_(addr) {} + + void on_container_start(proton::container& c) override { + c.connect(conn_url_); + } + + void on_connection_open(proton::connection& conn) override { + connection_ = conn; + work_queue_ = &conn.work_queue(); + // credit_window(0) so we control flow via "fetch" + receiver_ = conn.open_receiver(addr_, proton::receiver_options().credit_window(0)); + sender_ = conn.open_sender("", proton::sender_options{}.target(proton::target_options{}.anonymous(true))); + } + + void on_sender_open(proton::sender& s) override { + sender_ = s; + } + + void on_session_open(proton::session& s) override { + session_ = s; + { + auto l = std::lock_guard(wait_mutex_); + ready_ = true; + } + wait_cv_.notify_all(); + } + + void on_session_transaction_declared(proton::session& s) override { + std::cout << "transaction declared: " << s.transaction_id() << std::endl; + } + + void on_session_transaction_committed(proton::session& s) override { + std::cout << "transaction committed" << std::endl; + } + + void on_session_transaction_aborted(proton::session& s) override { + std::cout << "transaction aborted: " << s.transaction_error().what() << std::endl; + } + + void on_session_transaction_error(proton::session& s) override { + std::cout << "transaction error: " << s.transaction_error().what() << std::endl; + } + + void on_sendable(proton::sender&) override { + try_send(); + } + + void try_send() { + int to_send = 0; + std::string to_addr; + { + auto l = std::lock_guard(wait_mutex_); + to_send = send_pending_; + to_addr = send_to_addr_; + } + int sent = 0; + while (sender_ && sender_.credit() > 0 && to_send > 0) { + proton::message msg; + msg.id(send_next_id_); + if (to_addr != NO_TO_ADDRESS) + msg.to(to_addr); + msg.body(std::map<std::string, int>{{"message", send_next_id_}}); + sender_.send(msg); + ++send_next_id_; + ++sent; + --to_send; + } + if (sent > 0) { + auto l = std::lock_guard(wait_mutex_); + send_pending_ -= sent; + if (send_pending_ == 0) + wait_cv_.notify_all(); + } + } + + void on_message(proton::delivery& d, proton::message& msg) override { + std::cout << d.tag() << ": " << msg.body() << std::endl; + d.accept(); + { + auto l = std::lock_guard(wait_mutex_); + if (fetch_expected_ > 0) { + ++fetch_received_; + if (fetch_received_ >= fetch_expected_) { + fetch_done_ = true; + wait_cv_.notify_all(); + } + } + // Pre-settled deliveries never trigger on_delivery_settle; count them here for wait_settled + if (settled_expected_ > 0 && d.settled()) { + ++settled_received_; + if (settled_received_ >= settled_expected_) { + settled_done_ = true; + wait_cv_.notify_all(); + } + } + } + } + + void on_delivery_settle(proton::delivery&) override { + auto l = std::lock_guard(wait_mutex_); + if (settled_expected_ > 0) { + ++settled_received_; + if (settled_received_ >= settled_expected_) { + settled_done_ = true; + wait_cv_.notify_all(); + } + } + } + + void on_transactional_accept(proton::tracker& t) override { + std::cout << "disposition: accepted: " << t.tag() << " (transactional: " << t.session().transaction_id() << ")" << std::endl; + } + + void on_transactional_reject(proton::tracker& t) override { + std::cout << "disposition: rejected: " << t.tag() << " (transactional: " << t.session().transaction_id() << ")" << std::endl; + } + + void on_transactional_release(proton::tracker& t) override { + std::cout << "disposition: released: " << t.tag() << " (transactional: " << t.session().transaction_id() << ")" << std::endl; + } + + void on_tracker_accept(proton::tracker& t) override { + std::cout << "disposition: accepted: " << t.tag() << std::endl; + } + + void on_tracker_reject(proton::tracker& t) override { + std::cout << "disposition: rejected: " << t.tag() << std::endl; + } + + void on_tracker_release(proton::tracker& t) override { + std::cout << "disposition: released: " << t.tag() << std::endl; + } + + void on_tracker_settle(proton::tracker& t) override { + std::cout << "disposition: settled: " << t.tag() << std::endl; + } + + void on_session_error(proton::session& s) override { + std::cout << "Session error: " << s.error().what() << std::endl; + s.connection().close(); + } + + /// Called from the SIGINT-handling thread to wake any current wait. + void request_interrupt() { + { + auto l = std::lock_guard(wait_mutex_); + interrupt_requested_ = true; + wait_cv_.notify_all(); + } + { + auto l = std::lock_guard(sync_mutex_); + sync_cv_.notify_all(); + } + } + + /// True if the last wait was exited due to Ctrl-C. + bool interrupted() const { + auto l = std::lock_guard(wait_mutex_); + return interrupt_requested_; + } + + /// Clear the interrupt flag (e.g. after the command loop has handled it). + void clear_interrupt() { + auto l = std::lock_guard(wait_mutex_); + interrupt_requested_ = false; + } + + // Thread-safe: wait until handler is ready to accept commands + void wait_ready() { + auto l = std::unique_lock(wait_mutex_); + interrupt_requested_ = false; + wait_cv_.wait(l, [this] { return ready_ || interrupt_requested_; }); + } + + /// Wait until the connection thread has processed all work queued so far. + /// There is no "loop is idle" callback in the Proton API; this adds a + /// sentinel work item and returns when it runs (so the connection thread + /// has caught up). Use before showing the prompt so the command loop stays + /// in sync with background connection-thread activity. + void sync_with_connection_thread() { + { + auto l = std::lock_guard(wait_mutex_); + interrupt_requested_ = false; + } + { + auto l = std::lock_guard(sync_mutex_); + sync_done_ = false; + } + work_queue_->add([this]() { + auto l = std::lock_guard(sync_mutex_); + sync_done_ = true; + sync_cv_.notify_all(); + }); + auto l = std::unique_lock(sync_mutex_); + sync_cv_.wait(l, [this] { + if (sync_done_) return true; + auto w = std::lock_guard(wait_mutex_); + return interrupt_requested_; + }); + } + + /// Schedule a callback on the connection thread after a delay (seconds). + /// wait_sleep_done() blocks until that callback has run. + void sleep(double seconds) { + auto l = std::unique_lock(wait_mutex_); + interrupt_requested_ = false; + sleep_done_ = false; + l.unlock(); + auto ms = static_cast<proton::duration::numeric_type>(seconds * 1000); + work_queue_->schedule(proton::duration(ms), [this]() { + auto l = std::lock_guard(wait_mutex_); + sleep_done_ = true; + wait_cv_.notify_all(); + }); + l.lock(); + wait_cv_.wait(l, [this] { return sleep_done_ || interrupt_requested_; }); + } + + /// Start fetching n messages; optionally timeout after timeout_seconds (0 = no timeout). + /// wait_fetch_done() blocks until n messages received or timeout (whichever first). + void fetch(int n, double timeout_seconds) { + auto l = std::unique_lock(wait_mutex_); + interrupt_requested_ = false; + timed_out_ = false; + fetch_expected_ = n; + fetch_received_ = 0; + fetch_done_ = (n <= 0); + l.unlock(); + work_queue_->add([this, n]() { do_fetch(n); }); + if (timeout_seconds > 0) { + auto ms = static_cast<proton::duration::numeric_type>(timeout_seconds * 1000); + work_queue_->schedule(proton::duration(ms), [this]() { timeout_fired(); }); + } + l.lock(); + wait_cv_.wait(l, [this] { return fetch_done_ || timed_out_ || interrupt_requested_; }); + fetch_expected_ = 0; + } + int fetch_received_count() const { + auto l = std::lock_guard(wait_mutex_); + return fetch_received_; + } + bool timed_out() const { + auto l = std::lock_guard(wait_mutex_); + return timed_out_; + } + + /// Clear the timed-out flag (e.g. after the command loop has handled it). + void clear_timed_out() { + auto l = std::lock_guard(wait_mutex_); + timed_out_ = false; + } + + /// If a proton::error was recorded, assign its message to out, clear it, and return true. + bool take_last_error(std::string& out) { + auto l = std::lock_guard(wait_mutex_); + if (last_error_.empty()) + return false; + out = std::move(last_error_); + last_error_.clear(); + return true; + } + + /// Wait until N disposition settlements (on_delivery_settle) have been received, + /// or timeout_seconds (0 = no timeout). Same pattern as fetch. + void wait_settled(int n, double timeout_seconds) { + auto l = std::unique_lock(wait_mutex_); + interrupt_requested_ = false; + timed_out_ = false; + settled_expected_ = n; + settled_received_ = 0; + settled_done_ = (n <= 0); + l.unlock(); + if (n <= 0) + return; + if (timeout_seconds > 0) { + auto ms = static_cast<proton::duration::numeric_type>(timeout_seconds * 1000); + work_queue_->schedule(proton::duration(ms), [this]() { timeout_fired(); }); + } + l.lock(); + wait_cv_.wait(l, [this] { return settled_done_ || timed_out_ || interrupt_requested_; }); + settled_expected_ = 0; + } + int settled_received_count() const { + auto l = std::lock_guard(wait_mutex_); + return settled_received_; + } + + // Thread-safe: schedule work on the container thread + void declare() { + work_queue_->add([this]() { do_declare(); }); + } + void commit() { + work_queue_->add([this]() { do_commit(); }); + } + void abort() { + work_queue_->add([this]() { do_abort(); }); + } + void release() { + work_queue_->add([this]() { do_release(); }); + } + void send(int n, const std::string& to_addr) { + auto l = std::unique_lock(wait_mutex_); + interrupt_requested_ = false; + send_pending_ = n; + send_to_addr_ = to_addr.empty() ? addr_ : to_addr; + l.unlock(); + work_queue_->add([this]() { try_send(); }); + l.lock(); + wait_cv_.wait(l, [this] { return send_pending_ == 0 || interrupt_requested_; }); + } + void list_unsettled() { + auto count = std::size_t(0); + for (auto rcv : session_.receivers()) { + for (auto d : rcv.unsettled_deliveries()) { + (void)d; + ++count; + } + } + std::cout << count << " unsettled delivery(ies)" << std::endl; + for (auto rcv : session_.receivers()) { + for (auto d : rcv.unsettled_deliveries()) { + std::cout << " " << d.tag() << std::endl; + } + } + } + void quit() { + work_queue_->add([this]() { do_quit(); }); + } +}; + +using command_fn = bool (*)(tx_recv_interactive& recv, const std::vector<std::string>& args); + +static bool cmd_declare(tx_recv_interactive& recv, const std::vector<std::string>&) { + recv.declare(); + return false; +} +static bool cmd_fetch(tx_recv_interactive& recv, const std::vector<std::string>& args) { + auto n = 1; + auto timeout_seconds = 0.0; + if (!args.empty()) { + try { + n = std::stoi(args[0]); + if (n < 1) n = 1; + } catch (...) { + std::cout << "fetch: expected positive number, got '" << args[0] << "'" << std::endl; + return false; + } + } + if (args.size() >= 2) { + try { + timeout_seconds = std::stof(args[1]); + if (timeout_seconds < 0) timeout_seconds = 0; + } catch (...) { + std::cout << "fetch: expected timeout in seconds, got '" << args[1] << "'" << std::endl; + return false; + } + } + recv.fetch(n, timeout_seconds); + std::cout << "fetch: received " << recv.fetch_received_count() << " message(s)" << std::endl; + return false; +} +static bool cmd_commit(tx_recv_interactive& recv, const std::vector<std::string>&) { + recv.commit(); + return false; +} +static bool cmd_abort(tx_recv_interactive& recv, const std::vector<std::string>&) { + recv.abort(); + return false; +} +static bool cmd_unsettled(tx_recv_interactive& recv, const std::vector<std::string>&) { + recv.list_unsettled(); + return false; +} +static bool cmd_release(tx_recv_interactive& recv, const std::vector<std::string>&) { + recv.release(); + return false; +} +static bool cmd_send(tx_recv_interactive& recv, const std::vector<std::string>& args) { + auto n = 1; + auto to_addr = std::string(); + if (!args.empty()) { + try { + n = std::stoi(args[0]); + if (n < 1) n = 1; + } catch (...) { + std::cout << "send: expected positive number, got '" << args[0] << "'" << std::endl; + return false; + } + } + if (args.size() >= 2) + to_addr = args[1]; + recv.send(n, to_addr); + std::cout << "send: sent " << n << " message(s) to " << (to_addr.empty() ? "(default address)" : to_addr) << std::endl; + return false; +} +static bool cmd_wait_settled(tx_recv_interactive& recv, const std::vector<std::string>& args) { + auto n = 1; + auto timeout_seconds = 0.0; + if (!args.empty()) { + try { + n = std::stoi(args[0]); + if (n < 0) n = 0; + } catch (...) { + std::cout << "wait_settled: expected non-negative count, got '" << args[0] << "'" << std::endl; + return false; + } + } + if (args.size() >= 2) { + try { + timeout_seconds = std::stof(args[1]); + if (timeout_seconds < 0) timeout_seconds = 0; + } catch (...) { + std::cout << "wait_settled: expected timeout in seconds, got '" << args[1] << "'" << std::endl; + return false; + } + } + recv.wait_settled(n, timeout_seconds); + std::cout << "wait_settled: " << recv.settled_received_count() << " settlement(s)" << std::endl; + return false; +} +static bool cmd_sleep(tx_recv_interactive& recv, const std::vector<std::string>& args) { + if (args.empty()) { + std::cout << "sleep: expected duration in seconds (e.g. sleep 1.5)" << std::endl; + return false; + } + float seconds; + try { + seconds = std::stof(args[0]); + if (seconds < 0) seconds = 0; + } catch (...) { + std::cout << "sleep: expected number of seconds, got '" << args[0] << "'" << std::endl; + return false; + } + recv.sleep(static_cast<double>(seconds)); + return false; +} +static bool cmd_quit(tx_recv_interactive&, const std::vector<std::string>&) { + return true; +} + +static bool cmd_help(tx_recv_interactive&, const std::vector<std::string>&); + +struct command_entry { + const char* name; + const char* description; + command_fn fn; +}; + +// Lexicographically sorted by name for std::lower_bound lookup +static constexpr command_entry COMMAND_TABLE[] = { + {"abort", "Abort the current transaction", cmd_abort}, + {"commit", "Commit the current transaction", cmd_commit}, + {"declare", "Start a transaction", cmd_declare}, + {"fetch", "Receive n messages (optional timeout in seconds)", cmd_fetch}, + {"help", "Show this list of commands", cmd_help}, + {"quit", "Exit the program", cmd_quit}, + {"release", "Release all unsettled deliveries", cmd_release}, + {"send", "Send n messages to queue (optional to address; use <none> to omit)", cmd_send}, + {"sleep", "Sleep for given seconds", cmd_sleep}, + {"unsettled", "List unsettled deliveries", cmd_unsettled}, + {"wait_settled", "Wait for n disposition updates (optional timeout)", cmd_wait_settled}, +}; +static constexpr std::size_t COMMAND_TABLE_SIZE = sizeof(COMMAND_TABLE) / sizeof(COMMAND_TABLE[0]); + +static bool cmd_help(tx_recv_interactive&, const std::vector<std::string>&) { + for (std::size_t i = 0; i < COMMAND_TABLE_SIZE; ++i) { + std::cout << " " << COMMAND_TABLE[i].name << " - " << COMMAND_TABLE[i].description << "\n"; + } + return false; +} + +/// Split a string into words (by whitespace). +static std::vector<std::string> split_args(const std::string& line) { + std::vector<std::string> out; + std::istringstream is(line); + for (std::string word; is >> word;) out.push_back(word); + return out; +} + +/// Parsed command: first element is command name, remaining elements are arguments. +using parsed_command = std::vector<std::string>; + +/// Parse a line into zero or more commands separated by ';'. Each segment is +/// split into words (command name + args). Empty segments are skipped. +/// Returns a sequence that can be iterated to run each command. +static std::vector<parsed_command> parse_command_line(const std::string& line) { + std::vector<parsed_command> commands; + std::istringstream is(line); + for (std::string segment; std::getline(is, segment, ';');) { + auto args = split_args(segment); + if (!args.empty()) + commands.push_back(std::move(args)); + } + return commands; +} + +static const command_entry* find_command(std::string_view name) { + auto it = std::lower_bound(std::begin(COMMAND_TABLE), std::end(COMMAND_TABLE), name, + [](const command_entry& e, std::string_view s) { + return std::string_view(e.name) < s; + }); + if (it != std::end(COMMAND_TABLE) && std::string_view(it->name) == name) + return &*it; + return nullptr; +} + +static bool execute_command(tx_recv_interactive& recv, const command_entry& cmd, const std::vector<std::string>& args) { + auto cmd_args = std::vector<std::string>(args.begin() + 1, args.end()); + return cmd.fn(recv, cmd_args); +} + +#if defined(_WIN32) || defined(_WIN64) +static void block_interrupt() { +} + +static HANDLE g_ctrl_c_event = nullptr; + +static BOOL WINAPI win_ctrl_handler(DWORD dwCtrlType) { + if (dwCtrlType == CTRL_C_EVENT && g_ctrl_c_event != nullptr) + SetEvent(g_ctrl_c_event); + return TRUE; +} + +static void interrupt_thread(tx_recv_interactive* recv) { + HANDLE ev = CreateEventW(nullptr, TRUE, FALSE, nullptr); + if (ev == nullptr) + return; + g_ctrl_c_event = ev; + if (!SetConsoleCtrlHandler(win_ctrl_handler, TRUE)) { + CloseHandle(ev); + return; + } + for (;;) { + if (WaitForSingleObject(ev, INFINITE) == WAIT_OBJECT_0 && recv != nullptr) { + recv->request_interrupt(); + ResetEvent(ev); + } + } +} +#else +static void block_interrupt() { + sigset_t set; + sigemptyset(&set); + sigaddset(&set, SIGINT); + pthread_sigmask(SIG_BLOCK, &set, nullptr); +} + +static void interrupt_thread(tx_recv_interactive* recv) { + sigset_t set; + sigemptyset(&set); + sigaddset(&set, SIGINT); + for (;;) { + int sig = 0; + if (sigwait(&set, &sig) == 0 && sig == SIGINT && recv != nullptr) + recv->request_interrupt(); + } +} +#endif + +int main(int argc, char** argv) { + block_interrupt(); + + auto conn_url = std::string("//127.0.0.1:5672"); + auto addr = std::string("examples"); + auto initial_commands = std::string(); + auto opts = example::options(argc, argv); + + opts.add_value(conn_url, 'u', "url", "connection URL", "URL"); + opts.add_value(addr, 'a', "address", "address to receive messages from", "ADDR"); + opts.add_value(initial_commands, 'c', "commands", "commands to run before interactive mode (e.g. declare; fetch 2; quit)", "COMMANDS"); + + try { + opts.parse(); + + auto recv = tx_recv_interactive(conn_url, addr); + auto container = proton::container(recv); + auto container_thread = std::thread([&container]() { container.run(); }); + + std::thread interrupt_th(interrupt_thread, &recv); + interrupt_th.detach(); + + recv.wait_ready(); + + auto line = initial_commands; + bool quit_requested = false; + while (!quit_requested) { + if (line.empty()) { + std::cout << "> " << std::flush; + if (!std::getline(std::cin, line)) + break; + } + for (const auto& args : parse_command_line(line)) { + auto* cmd = find_command(args[0]); + if (cmd) { + if (execute_command(recv, *cmd, args)) { + quit_requested = true; + break; + } + if (recv.interrupted()) { + std::cout << "[interrupted]" << std::endl; + recv.clear_interrupt(); + } + if (recv.timed_out()) { + std::cout << "[timed out]" << std::endl; + recv.clear_timed_out(); + } + recv.sync_with_connection_thread(); + if (std::string err; recv.take_last_error(err)) { + std::cout << "[error: " << err << "]" <<std::endl; + } + } else { + std::cout << "Unknown command. Type 'help' for a list of commands." << std::endl; + } + } + line.clear(); + } + + recv.quit(); + container_thread.join(); + return 0; + } catch (const example::bad_option& e) { + std::cout << opts << std::endl << e.what() << std::endl; + } catch (const std::exception& e) { + std::cerr << e.what() << std::endl; + } + + return 1; +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
