Repository: qpid-proton Updated Branches: refs/heads/master 298e7dba1 -> a5f88a1cc (forced update)
PROTON-1557: c++ improve multi-threaded clients 2 clients: - multithreaded_client.cpp: simple send thread, receive thread, run thread - multithreaded_client_flow_control: multi-connection, block for flow control Changes: - reduced needless diff between examples - use separate work_queue* to clarify separate thread safety rules from sender - took work_queue->add() out of lock to emphasize it is thread safe - use fixed argument list, same arg order Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/a5f88a1c Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/a5f88a1c Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/a5f88a1c Branch: refs/heads/master Commit: a5f88a1cc6af8d9758edcd76b0b6db8ffca4bd5b Parents: 68c8cf4 Author: Alan Conway <acon...@redhat.com> Authored: Mon Aug 28 11:53:35 2017 -0400 Committer: Alan Conway <acon...@redhat.com> Committed: Mon Aug 28 17:58:21 2017 -0400 ---------------------------------------------------------------------- examples/cpp/CMakeLists.txt | 3 +- examples/cpp/mt_queue.hpp | 102 ------- examples/cpp/multithreaded_client.cpp | 185 ++++++++++++ .../cpp/multithreaded_client_flow_control.cpp | 287 +++++++++++++++++++ examples/cpp/send_recv_mt.cpp | 269 ----------------- 5 files changed, 474 insertions(+), 372 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5f88a1c/examples/cpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/examples/cpp/CMakeLists.txt b/examples/cpp/CMakeLists.txt index df9f6a7..d116913 100644 --- a/examples/cpp/CMakeLists.txt +++ b/examples/cpp/CMakeLists.txt @@ -63,7 +63,8 @@ if(HAS_CPP11) # Examples that require C++11 foreach(example scheduled_send - send_recv_mt + multithreaded_client + multithreaded_client_flow_control ) add_executable(${example} ${example}.cpp) endforeach() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5f88a1c/examples/cpp/mt_queue.hpp ---------------------------------------------------------------------- diff --git a/examples/cpp/mt_queue.hpp b/examples/cpp/mt_queue.hpp deleted file mode 100644 index f053ebe..0000000 --- a/examples/cpp/mt_queue.hpp +++ /dev/null @@ -1,102 +0,0 @@ -#ifndef MT_QUEUE_HPP -#define MT_QUEUE_HPP - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <condition_variable> -#include <stdexcept> -#include <mutex> -#include <queue> - -class closed_error : public std::runtime_error { - public: - closed_error() : std::runtime_error("closed") {} -}; - -// A bounded, thread-safe queue. -// Objects are moved on and off the queue, not copied. Avoids overhead of copy operations. -template <class T, size_t CAPACITY> class mt_queue { - std::queue<T> q_; - std::mutex lock_; - std::condition_variable push_; - std::condition_variable pop_; - bool closed_; - - void do_push(T&& x) { - q_.push(std::move(x)); - pop_.notify_one(); - } - - T do_pop() { - T x(std::move(q_.front())); - q_.pop(); - push_.notify_one(); - return x; - } - - bool can_push() { return q_.size() < CAPACITY; } - bool can_pop() { return q_.size() > 0; } - - public: - - mt_queue() : closed_(false) {} - - void push(T&& x) { - std::unique_lock<std::mutex> l(lock_); - while(!can_push()) - push_.wait(l); - do_push(std::move(x)); - } - - T pop() { - std::unique_lock<std::mutex> l(lock_); - while(!can_pop()) - pop_.wait(l); - return do_pop(); - } - - bool try_push(T&& x) noexcept { - std::lock_guard<std::mutex> l(lock_); - bool ok = can_push(); - if (ok) - do_push(std::move(x)); - return ok; - } - - bool try_pop(T& x) noexcept { - std::lock_guard<std::mutex> l(lock_); - bool ok = can_pop(); - if (ok) - x = std::move(do_pop()); - return ok; - } - - size_t capacity() noexcept { - return CAPACITY; - } - - size_t size() noexcept { - std::lock_guard<std::mutex> l(lock_); - return q_.size(); - } -}; - - -#endif // MT_QUEUE_HPP http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5f88a1c/examples/cpp/multithreaded_client.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/multithreaded_client.cpp b/examples/cpp/multithreaded_client.cpp new file mode 100644 index 0000000..955655c --- /dev/null +++ b/examples/cpp/multithreaded_client.cpp @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// +// C++11 only +// +// A multi-threaded client that calls proton::container::run() in one thread, sends +// messages in another and receives messages in a third. +// +// Note this client does not deal with flow-control. If the sender is faster +// than the receiver, messages will build up in memory on the sending side. +// See @ref multithreaded_client_flow_control.cpp for a more complex example with +// flow control. +// +// NOTE: no proper error handling + +#include <proton/connection.hpp> +#include <proton/connection_options.hpp> +#include <proton/container.hpp> +#include <proton/message.hpp> +#include <proton/messaging_handler.hpp> +#include <proton/receiver.hpp> +#include <proton/sender.hpp> +#include <proton/work_queue.hpp> + +#include <condition_variable> +#include <iostream> +#include <mutex> +#include <queue> +#include <sstream> +#include <string> +#include <thread> + +// Handler for a single thread-safe sending and receiving connection. +class client : public proton::messaging_handler { + // Invariant + const std::string url_; + const std::string address_; + + // Only used in proton handler thread + proton::sender sender_; + + // Shared by proton and user threads, protected by lock_ + std::mutex lock_; + proton::work_queue *work_queue_; + std::condition_variable sender_ready_; + std::queue<proton::message> messages_; + std::condition_variable messages_ready_; + + public: + client(const std::string& url, const std::string& address) : url_(url), address_(address) {} + + // Thread safe + void send(const proton::message& msg) { + // Use [=] to copy the message, we cannot pass it by reference since it + // will be used in another thread. + work_queue()->add([=]() { sender_.send(msg); }); + } + + // Thread safe + proton::message receive() { + std::unique_lock<std::mutex> l(lock_); + while (messages_.empty()) messages_ready_.wait(l); + auto msg = std::move(messages_.front()); + messages_.pop(); + return msg; + } + + // Thread safe + void close() { + work_queue()->add([=]() { sender_.connection().close(); }); + } + + private: + + proton::work_queue* work_queue() { + // Wait till work_queue_ and sender_ are initialized. + std::unique_lock<std::mutex> l(lock_); + while (!work_queue_) sender_ready_.wait(l); + return work_queue_; + } + + // == messaging_handler overrides, only called in proton hander thread + + // Note: this example creates a connection when the container starts. + // To create connections after the container has started, use + // container::connect(). + // See @ref multithreaded_client_flow_control.cpp for an example. + void on_container_start(proton::container& cont) override { + cont.connect(url_); + } + + void on_connection_open(proton::connection& conn) override { + conn.open_sender(address_); + conn.open_receiver(address_); + } + + void on_sender_open(proton::sender& s) override { + { + // sender_ and work_queue_ must be set atomically + std::lock_guard<std::mutex> l(lock_); + sender_ = s; + work_queue_ = &s.work_queue(); + } + sender_ready_.notify_all(); + } + + void on_message(proton::delivery& dlv, proton::message& msg) override { + { + std::lock_guard<std::mutex> l(lock_); + messages_.push(msg); + } + messages_ready_.notify_all(); + } + + void on_error(const proton::error_condition& e) override { + std::cerr << "unexpected error: " << e << std::endl; + exit(1); + } +}; + +int main(int argc, const char** argv) { + try { + if (argc != 4) { + std ::cerr << + "Usage: " << argv[0] << " CONNECTION-URL AMQP-ADDRESS MESSAGE-COUNT\n" + "CONNECTION-URL: connection address, e.g.'amqp://127.0.0.1'\n" + "AMQP-ADDRESS: AMQP node address, e.g. 'examples'\n" + "MESSAGE-COUNT: number of messages to send\n"; + return 1; + } + const char *url = argv[1]; + const char *address = argv[2]; + int n_messages = atoi(argv[3]); + + client cl(url, address); + proton::container container(cl); + std::thread container_thread([&]() { container.run(); }); + + std::thread sender([&]() { + for (int i = 0; i < n_messages; ++i) { + proton::message msg(std::to_string(i + 1)); + cl.send(msg); + std::cout << "sent: " << msg.body() << std::endl; + } + }); + + int received = 0; + std::thread receiver([&]() { + for (int i = 0; i < n_messages; ++i) { + auto msg = cl.receive(); + std::cout << "received: " << msg.body() << std::endl; + ++received; + } + }); + + sender.join(); + receiver.join(); + cl.close(); + container_thread.join(); + std::cout << "received " << received << " messages" << std::endl; + + return 0; + } catch (const std::exception& e) { + std::cerr << e.what() << std::endl; + } + + return 1; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5f88a1c/examples/cpp/multithreaded_client_flow_control.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/multithreaded_client_flow_control.cpp b/examples/cpp/multithreaded_client_flow_control.cpp new file mode 100644 index 0000000..9eec782 --- /dev/null +++ b/examples/cpp/multithreaded_client_flow_control.cpp @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +// C++11 only +// +// A multi-threaded client that sends and receives messages from multiple AMQP +// addresses. +// +// Demonstrates how to: +// +// - implement proton handlers that interact with user threads safely +// - block sender threads to respect AMQP flow control +// - use AMQP flow control to limit message buffering for receivers threads +// +// We define sender and receiver classes with simple, thread-safe blocking +// send() and receive() functions. +// +// These classes are also privately proton::message_handler instances. They use +// the thread-safe proton::work_queue and standard C++ synchronization (std::mutex +// etc.) to pass messages between user and proton::container threads. +// +// NOTE: no proper error handling + +#include <proton/connection.hpp> +#include <proton/connection_options.hpp> +#include <proton/container.hpp> +#include <proton/message.hpp> +#include <proton/messaging_handler.hpp> +#include <proton/receiver.hpp> +#include <proton/receiver_options.hpp> +#include <proton/sender.hpp> +#include <proton/work_queue.hpp> + +#include <atomic> +#include <condition_variable> +#include <iostream> +#include <mutex> +#include <queue> +#include <sstream> +#include <string> +#include <thread> + +// A thread-safe sending connection that blocks sending threads when there +// is no AMQP credit to send messages. +class sender : private proton::messaging_handler { + // Only used in proton handler thread + proton::sender sender_; + + // Shared by proton and user threads, protected by lock_ + std::mutex lock_; + proton::work_queue *work_queue_; + std::condition_variable sender_ready_; + int queued_; // Queued messages waiting to be sent + int credit_; // AMQP credit - number of messages we can send + + public: + sender(proton::container& cont, const std::string& url, const std::string& address) + : work_queue_(0), queued_(0), credit_(0) + { + cont.open_sender(url+"/"+address, proton::connection_options().handler(*this)); + } + + // Thread safe + void send(const proton::message& m) { + { + std::unique_lock<std::mutex> l(lock_); + // Don't queue up more messages than we have credit for + while (!work_queue_ || queued_ >= credit_) sender_ready_.wait(l); + ++queued_; + } + work_queue_->add([=]() { this->do_send(m); }); // work_queue_ is thread safe + } + + // Thread safe + void close() { + work_queue()->add([=]() { sender_.connection().close(); }); + } + + private: + + proton::work_queue* work_queue() { + // Wait till work_queue_ and sender_ are initialized. + std::unique_lock<std::mutex> l(lock_); + while (!work_queue_) sender_ready_.wait(l); + return work_queue_; + } + + // == messaging_handler overrides, only called in proton hander thread + + void on_sender_open(proton::sender& s) override { + // Make sure sender_ and work_queue_ are set atomically + std::lock_guard<std::mutex> l(lock_); + sender_ = s; + work_queue_ = &s.work_queue(); + } + + void on_sendable(proton::sender& s) override { + std::lock_guard<std::mutex> l(lock_); + credit_ = s.credit(); + sender_ready_.notify_all(); // Notify senders we have credit + } + + // work_queue work items is are automatically dequeued and called by proton + // This function is called because it was queued by send() + void do_send(const proton::message& m) { + sender_.send(m); + std::lock_guard<std::mutex> l(lock_); + --queued_; // work item was consumed from the work_queue + credit_ = sender_.credit(); // update credit + sender_ready_.notify_all(); // Notify senders we have space on queue + } + + void on_error(const proton::error_condition& e) override { + std::cerr << "unexpected error: " << e << std::endl; + exit(1); + } +}; + +// A thread safe receiving connection that blocks receiving threads when there +// are no messages available, and maintains a bounded buffer of incoming +// messages by issuing AMQP credit only when there is space in the buffer. +class receiver : private proton::messaging_handler { + static const size_t MAX_BUFFER = 100; // Max number of buffered messages + + // Used in proton threads only + proton::receiver receiver_; + + // Used in proton and user threads, protected by lock_ + std::mutex lock_; + proton::work_queue* work_queue_; + std::queue<proton::message> buffer_; // Messages not yet returned by receive() + std::condition_variable can_receive_; // Notify receivers of messages + + public: + + // Connect to url + receiver(proton::container& cont, const std::string& url, const std::string& address) + : work_queue_() + { + // NOTE:credit_window(0) disables automatic flow control. + // We will use flow control to match AMQP credit to buffer capacity. + cont.open_receiver(url+"/"+address, proton::receiver_options().credit_window(0), + proton::connection_options().handler(*this)); + } + + // Thread safe receive + proton::message receive() { + std::unique_lock<std::mutex> l(lock_); + // Wait for buffered messages + while (!work_queue_ || buffer_.empty()) + can_receive_.wait(l); + proton::message m = std::move(buffer_.front()); + buffer_.pop(); + // Add a lambda to the work queue to call receive_done(). + // This will tell the handler to add more credit. + work_queue_->add([=]() { this->receive_done(); }); + return m; + } + + void close() { + std::lock_guard<std::mutex> l(lock_); + if (work_queue_) work_queue_->add([this]() { this->receiver_.connection().close(); }); + } + + private: + // ==== The following are called by proton threads only. + + void on_receiver_open(proton::receiver& r) override { + receiver_ = r; + std::lock_guard<std::mutex> l(lock_); + work_queue_ = &receiver_.work_queue(); + receiver_.add_credit(MAX_BUFFER); // Buffer is empty, initial credit is the limit + } + + void on_message(proton::delivery &d, proton::message &m) override { + // Proton automatically reduces credit by 1 before calling on_message + std::lock_guard<std::mutex> l(lock_); + buffer_.push(m); + can_receive_.notify_all(); + } + + // called via work_queue + void receive_done() { + // Add 1 credit, a receiver has taken a message out of the buffer. + receiver_.add_credit(1); + } + + void on_error(const proton::error_condition& e) override { + std::cerr << "unexpected error: " << e << std::endl; + exit(1); + } +}; + +// ==== Example code using the sender and receiver + +// Send n messages +void send_thread(sender& s, int n, bool print) { + auto id = std::this_thread::get_id(); + for (int i = 0; i < n; ++i) { + std::ostringstream ss; + ss << std::this_thread::get_id() << ":" << i; + s.send(proton::message(ss.str())); + if (print) std::cout << "received: " << ss.str() << std::endl; + } + std::cout << id << " sent " << n << std::endl; +} + +// Receive messages till atomic remaining count is 0. +// remaining is shared among all receiving threads +void receive_thread(receiver& r, std::atomic_int& remaining, bool print) { + auto id = std::this_thread::get_id(); + int n = 0; + while (remaining-- > 0) { + auto m = r.receive(); + ++n; + if (print) std::cout << id << "received: " << m.body() << std::endl; + } + std::cout << id << " received " << n << " messages" << std::endl; +} + +int main(int argc, const char **argv) { + try { + if (argc != 5) { + std::cerr << + "Usage: " << argv[0] << " MESSAGE-COUNT THREAD-COUNT URL\n" + "CONNECTION-URL: connection address, e.g.'amqp://127.0.0.1'\n" + "AMQP-ADDRESS: AMQP node address, e.g. 'examples'\n" + "MESSAGE-COUNT: number of messages to send\n" + "THREAD-COUNT: number of sender/receiver thread pairs\n"; + return 1; + } + + const char *url = argv[1]; + const char *address = argv[2]; + int n_messages = atoi(argv[3]); + int n_threads = atoi(argv[4]); + + // Total messages to be received, multiple receiver threads will decrement this. + std::atomic_int remaining(n_messages * n_threads); + bool print = remaining < 1000; // Don't print for long runs, dominates run time + + // Run the proton container + proton::container container; + auto container_thread = std::thread([&]() { container.run(); }); + + // A single sender and receiver to be shared by all the threads + sender send(container, url, address); + receiver recv(container, url, address); + + // Start receiver threads, then sender threads. + // Starting receivers first gives all receivers a chance to compete for messages. + std::vector<std::thread> threads; + for (int i = 0; i < n_threads; ++i) + threads.push_back(std::thread([&]() { receive_thread(recv, remaining, print); })); + for (int i = 0; i < n_threads; ++i) + threads.push_back(std::thread([&]() { send_thread(send, n_messages, print); })); + + // Wait for threads to finish + for (auto& t : threads) + t.join(); + send.close(); + recv.close(); + + container_thread.join(); + + return 0; + } catch (const std::exception& e) { + std::cerr << e.what() << std::endl; + } + return 1; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5f88a1c/examples/cpp/send_recv_mt.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/send_recv_mt.cpp b/examples/cpp/send_recv_mt.cpp deleted file mode 100644 index addcbaf..0000000 --- a/examples/cpp/send_recv_mt.cpp +++ /dev/null @@ -1,269 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -// C++11 only -// -// A multi-threaded client that sends and receives messages from multiple AMQP -// addresses. -// -// Demonstrates how to: -// -// - implement proton handlers that interact with user threads safely -// - block user threads calling send() to respect AMQP flow control -// - use AMQP flow control to limit message buffering for receivers -// -// We define mt_sender and mt_receiver classes with simple, thread-safe blocking -// send() and receive() functions. -// -// These classes are also privately proton::message_handler instances. They use -// the thread-safe proton::work_queue and standard C++ synchronization (std::mutex -// etc.) to pass messages between user and proton::container threads. -// -// NOTE: no proper error handling - -#include <proton/connection.hpp> -#include <proton/connection_options.hpp> -#include <proton/container.hpp> -#include <proton/message.hpp> -#include <proton/messaging_handler.hpp> -#include <proton/receiver_options.hpp> -#include <proton/sender.hpp> -#include <proton/work_queue.hpp> - -#include <atomic> -#include <condition_variable> -#include <iostream> -#include <mutex> -#include <queue> -#include <sstream> -#include <thread> - -// Lock to serialize std::cout, std::cerr used from multiple threads. -std::mutex out_lock; -#define LOCK(EXPR) do { std::lock_guard<std::mutex> l(out_lock); EXPR; } while(0) -#define COUT(EXPR) do { LOCK(std::cout << EXPR); } while(0) -#define CERR(EXPR) do { LOCK(std::cerr << EXPR); } while(0) - -// A thread-safe sending connection. -class mt_sender : private proton::messaging_handler { - // Only used in proton thread - proton::sender sender_; - - // Shared by proton and user threads, use lock_ to protect. - std::mutex lock_; - proton::work_queue* work_queue_; // Messages waiting to be sent - std::condition_variable can_send_; // Signal sending threads - int queued_; // Queued messages waiting to be sent - int credit_; // AMQP credit - number of messages we can send - - public: - // Connect to url - mt_sender(proton::container& cont, const std::string& url) : - work_queue_(0), queued_(0), credit_(0) - { - // Pass *this as handler. - cont.open_sender(url, proton::connection_options().handler(*this)); - } - - // Thread safe send() - void send(const proton::message& m) { - std::unique_lock<std::mutex> l(lock_); - // Don't queue up more messages than we have credit for - while (!(work_queue_ && queued_ < credit_)) - can_send_.wait(l); - ++queued_; - // Add a lambda function to the work queue. - // This will call do_send() with a copy of m in the correct proton thread. - work_queue_->add([=]() { this->do_send(m); }); - } - - void close() { - std::lock_guard<std::mutex> l(lock_); - if (work_queue_) - work_queue_->add([this]() { this->sender_.connection().close(); }); - } - - private: - // ==== called by proton threads only - - void on_sender_open(proton::sender& s) override { - sender_ = s; - std::lock_guard<std::mutex> l(lock_); - work_queue_ = &s.work_queue(); - } - - void on_sendable(proton::sender& s) override { - std::lock_guard<std::mutex> l(lock_); - credit_ = s.credit(); - can_send_.notify_all(); // Notify senders we have credit - } - - // work_queue work items is are automatically dequeued and called by proton - // This function is called because it was queued by send() - void do_send(const proton::message& m) { - sender_.send(m); - std::lock_guard<std::mutex> l(lock_); - --queued_; // work item was consumed from the work_queue - credit_ = sender_.credit(); // update credit - can_send_.notify_all(); // Notify senders we have space on queue - } - - void on_error(const proton::error_condition& e) override { - CERR("unexpected error: " << e << std::endl); - exit(1); - } -}; - -// A thread safe receiving connection. -class mt_receiver : private proton::messaging_handler { - static const size_t MAX_BUFFER = 100; // Max number of buffered messages - - // Used in proton threads only - proton::receiver receiver_; - - // Used in proton and user threads, protected by lock_ - std::mutex lock_; - proton::work_queue* work_queue_; - std::queue<proton::message> buffer_; // Messages not yet returned by receive() - std::condition_variable can_receive_; // Notify receivers of messages - - public: - - // Connect to url - mt_receiver(proton::container& cont, const std::string& url) : work_queue_() - { - // NOTE:credit_window(0) disables automatic flow control. - // We will use flow control to match AMQP credit to buffer capacity. - cont.open_receiver(url, proton::receiver_options().credit_window(0), - proton::connection_options().handler(*this)); - } - - // Thread safe receive - proton::message receive() { - std::unique_lock<std::mutex> l(lock_); - // Wait for buffered messages - while (!work_queue_ || buffer_.empty()) - can_receive_.wait(l); - proton::message m = std::move(buffer_.front()); - buffer_.pop(); - // Add a lambda to the work queue to call receive_done(). - // This will tell the handler to add more credit. - work_queue_->add([=]() { this->receive_done(); }); - return m; - } - - void close() { - std::lock_guard<std::mutex> l(lock_); - if (work_queue_) - work_queue_->add([this]() { this->receiver_.connection().close(); }); - } - - private: - // ==== The following are called by proton threads only. - - void on_receiver_open(proton::receiver& r) override { - receiver_ = r; - std::lock_guard<std::mutex> l(lock_); - work_queue_ = &receiver_.work_queue(); - receiver_.add_credit(MAX_BUFFER); // Buffer is empty, initial credit is the limit - } - - void on_message(proton::delivery &d, proton::message &m) override { - // Proton automatically reduces credit by 1 before calling on_message - std::lock_guard<std::mutex> l(lock_); - buffer_.push(m); - can_receive_.notify_all(); - } - - // called via work_queue - void receive_done() { - // Add 1 credit, a receiver has taken a message out of the buffer. - receiver_.add_credit(1); - } - - void on_error(const proton::error_condition& e) override { - CERR("unexpected error: " << e << std::endl); - exit(1); - } -}; - -// ==== Example code using the mt_sender and mt_receiver - -// Send n messages -void send_thread(mt_sender& s, int n) { - for (int i = 0; i < n; ++i) { - std::ostringstream o; - o << std::this_thread::get_id() << ":" << i; - s.send(proton::message(o.str())); - } - COUT(std::this_thread::get_id() << " sent " << n << std::endl); -} - -// Receive messages till atomic remaining count is 0. -// remaining is shared among all receiving threads -void receive_thread(mt_receiver& r, std::atomic_int& remaining, bool print) { - auto id = std::this_thread::get_id(); - int n = 0; - while (remaining-- > 0) { - auto m = r.receive(); - ++n; - if (print) - COUT(id << " received \"" << m.body() << '"' << std::endl); - } - COUT(id << " received " << n << " messages" << std::endl); -} - -int main(int argc, const char **argv) { - try { - int n_threads = argc > 1 ? atoi(argv[1]) : 2; - int n_messages = argc > 2 ? atoi(argv[2]) : 10; - const char *url = argc > 3 ? argv[3] : "amqp://127.0.0.1/examples"; - std::atomic_int remaining(n_messages * n_threads); // Total messages to be received - bool print = (remaining <= 30); // Print messages for short runs only - - // Run the proton container - proton::container container; - auto container_thread = std::thread([&]() { container.run(); }); - - // A single sender and receiver to be shared by all the threads - mt_sender sender(container, url); - mt_receiver receiver(container, url); - - // Start receiver threads, then sender threads. - // Starting receivers first gives all receivers a chance to compete for messages. - std::vector<std::thread> threads; - for (int i = 0; i < n_threads; ++i) - threads.push_back(std::thread([&]() { receive_thread(receiver, remaining, print); })); - for (int i = 0; i < n_threads; ++i) - threads.push_back(std::thread([&]() { send_thread(sender, n_messages); })); - - // Wait for threads to finish - for (auto& n_messages_threads : threads) - n_messages_threads.join(); - sender.close(); - receiver.close(); - - container_thread.join(); - - return 0; - } catch (const std::exception& e) { - std::cerr << e.what() << std::endl; - } - return 1; -} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org