PROTON-1400: [C++ example] Rework broker to use container event loops
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/2b2666b2 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/2b2666b2 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/2b2666b2 Branch: refs/heads/master Commit: 2b2666b2758c5d493d07441a3eaad4a52d2bfbe1 Parents: 570d0a1 Author: Andrew Stitcher <[email protected]> Authored: Tue Mar 28 13:45:28 2017 -0400 Committer: Andrew Stitcher <[email protected]> Committed: Fri Jul 21 12:50:06 2017 -0400 ---------------------------------------------------------------------- examples/cpp/broker.cpp | 553 +++++++++++++++++++++++++++++-------------- 1 file changed, 377 insertions(+), 176 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2b2666b2/examples/cpp/broker.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/broker.cpp b/examples/cpp/broker.cpp index 5b0982b..2fbe077 100644 --- a/examples/cpp/broker.cpp +++ b/examples/cpp/broker.cpp @@ -25,6 +25,7 @@ #include <proton/default_container.hpp> #include <proton/delivery.hpp> #include <proton/error_condition.hpp> +#include <proton/function.hpp> #include <proton/listen_handler.hpp> #include <proton/listener.hpp> #include <proton/message.hpp> @@ -36,238 +37,436 @@ #include <proton/target_options.hpp> #include <proton/thread_safe.hpp> #include <proton/tracker.hpp> +#include <proton/transport.hpp> -#include <atomic> #include <deque> -#include <functional> #include <iostream> #include <map> -#include <mutex> #include <string> #include "fake_cpp11.hpp" -// Thread safe queue. -// Stores messages, notifies subscribed connections when there is data. -class queue { - public: - queue(const std::string& name) : name_(name) {} +// This is a simplified model for a message broker, that only allows for messages to go to a +// single receiver. +// +// Queues are only created and never destroyed +// +// Broker Entities (that need to be individually serialised) +// QueueManager - Creates new queues, finds queues +// Queue - Queues msgs, records subscribers, sends msgs to subscribers +// Connection - Receives Messages from network, sends messages to network. + +// Work +// FindQueue(queueName, connection) - From a Connection to the QueueManager +// This will create the queue if it doesn't already exist and send a BoundQueue +// message back to the connection. +// BoundQueue(queue) - From the QueueManager to a Connection +// +// QueueMsg(msg) - From a Connection (receiver) to a Queue +// Subscribe(sender) - From a Connection (sender) to a Queue +// Flow(sender, credit) - From a Connection (sender) to a Queue +// Unsubscribe(sender) - From a Connection (sender) to a Queue +// +// SendMsg(msg) - From a Queue to a Connection (sender) +// Unsubscribed() - From a Queue to a Connection (sender) + +// Utilities to make injecting member functions palatable in C++03 +template <class T> +struct work0 : public proton::void_function0 { + T& holder_; + void (T::* fn_)(); + + work0(T& h, void (T::* a)()) : + holder_(h), fn_(a) {} + + void operator()() OVERRIDE { + (holder_.*fn_)(); + delete this; + } +}; + +template <class T, class A> +struct work1 : public proton::void_function0 { + T& holder_; + void (T::* fn_)(A); + A a_; + + work1(T& h, void (T::* t)(A), A a) : + holder_(h), fn_(t), a_(a) {} + + void operator()() OVERRIDE { + (holder_.*fn_)(a_); + delete this; + } +}; + +template <class T, class A, class B> +struct work2 : public proton::void_function0 { + T& holder_; + void (T::* fn_)(A, B); + A a_; + B b_; + + work2(T& h, void (T::* t)(A, B), A a, B b) : + holder_(h), fn_(t), a_(a), b_(b) {} + + void operator()() OVERRIDE { + (holder_.*fn_)(a_, b_); + delete this; + } +}; + +template <class T, class A, class B, class C> +struct work3 : public proton::void_function0 { + T& holder_; + void (T::* fn_)(A, B, C); + A a_; + B b_; + C c_; + + work3(T& h, void (T::* t)(A, B, C), A a, B b, C c) : + holder_(h), fn_(t), a_(a), b_(b), c_(c) {} + + void operator()() OVERRIDE { + (holder_.*fn_)(a_, b_, c_); + delete this; + } +}; + +template <class T> +void defer(T* t, void (T::*f)()) { + work0<T>* w = new work0<T>(*t, f); + t->inject(*w); +} + +template <class T, class A> +void defer(T* t, void (T::*f)(A), A a) { + work1<T, A>* w = new work1<T, A>(*t, f, a); + t->inject(*w); +} + +template <class T, class A, class B> +void defer(T* t, void (T::*f)(A, B), A a, B b) { + work2<T, A, B>* w = new work2<T, A, B>(*t, f, a, b); + t->inject(*w); +} + +template <class T, class A, class B, class C> +void defer(T* t, void (T::*f)(A, B, C), A a, B b, C c) { + work3<T, A, B, C>* w = new work3<T, A, B, C>(*t, f, a, b, c); + t->inject(*w); +} + +// Simple debug output +bool verbose; +#define DOUT(x) do {if (verbose) {x};} while (false) + +class Queue; +class Sender; - std::string name() const { return name_; } +typedef std::map<proton::sender, Sender*> senders; - // Push a message onto the queue. - // If the queue was previously empty, notify subscribers it has messages. - // Called from receiver's connection. - void push(const proton::message &m) { - std::lock_guard<std::mutex> g(lock_); +class Sender : public proton::messaging_handler { + friend class connection_handler; + + proton::sender sender_; + senders& senders_; + proton::event_loop& event_loop_; + std::string queue_name_; + Queue* queue_; + int pending_credit_; + + // Messaging handlers + void on_sendable(proton::sender &sender) OVERRIDE; + void on_sender_close(proton::sender &sender) OVERRIDE; + +public: + Sender(proton::sender s, senders& ss) : + sender_(s), senders_(ss), event_loop_(make_thread_safe(s).get()->event_loop()), queue_(0), pending_credit_(0) + {} + + void inject(proton::void_function0& f) { + event_loop_.inject(f); + } + + + void boundQueue(Queue* q, std::string qn); + void sendMsg(proton::message m) { + DOUT(std::cerr << "Sender: " << this << " sending\n";); + sender_.send(m); + } + void unsubscribed() { + DOUT(std::cerr << "Sender: " << this << " deleting\n";); + delete this; + } +}; + +// Queue - round robin subscriptions +class Queue { + proton::event_loop event_loop_; + const std::string name_; + std::deque<proton::message> messages_; + typedef std::map<Sender*, int> subscriptions; // With credit + subscriptions subscriptions_; + subscriptions::iterator current_; + + void tryToSend() { + DOUT(std::cerr << "Queue: " << this << " tryToSend: " << subscriptions_.size();); + // Starting at current_, send messages to subscriptions with credit: + // After each send try to find another subscription; Wrap around; + // Finish when we run out of messages or credit. + size_t outOfCredit = 0; + while (!messages_.empty() && outOfCredit<subscriptions_.size()) { + // If we got the end (or haven't started yet) start at the beginning + if (current_==subscriptions_.end()) { + current_=subscriptions_.begin(); + } + // If we have credit send the message + DOUT(std::cerr << "(" << current_->second << ") ";); + if (current_->second>0) { + DOUT(std::cerr << current_->first << " ";); + defer(current_->first, &Sender::sendMsg, messages_.front()); + messages_.pop_front(); + --current_->second; + ++current_; + } else { + ++outOfCredit; + } + } + DOUT(std::cerr << "\n";); + } + +public: + Queue(proton::container& c, const std::string& n) : + event_loop_(c), name_(n), current_(subscriptions_.end()) + {} + + void inject(proton::void_function0& f) { + event_loop_.inject(f); + } + + void queueMsg(proton::message m) { + DOUT(std::cerr << "Queue: " << this << "(" << name_ << ") queueMsg\n";); messages_.push_back(m); - if (messages_.size() == 1) { // Non-empty, notify subscribers - for (auto cb : callbacks_) - cb(this); - callbacks_.clear(); + tryToSend(); + } + void flow(Sender* s, int c) { + DOUT(std::cerr << "Queue: " << this << "(" << name_ << ") flow: " << c << " to " << s << "\n";); + subscriptions_[s] = c; + tryToSend(); + } + void subscribe(Sender* s) { + DOUT(std::cerr << "Queue: " << this << "(" << name_ << ") subscribe Sender: " << s << "\n";); + subscriptions_[s] = 0; + } + void unsubscribe(Sender* s) { + DOUT(std::cerr << "Queue: " << this << "(" << name_ << ") unsubscribe Sender: " << s << "\n";); + // If we're about to erase the current subscription move on + if (current_ != subscriptions_.end() && current_->first==s) ++current_; + subscriptions_.erase(s); + defer(s, &Sender::unsubscribed); + } +}; + +// We have credit to send a message. +void Sender::on_sendable(proton::sender &sender) { + if (queue_) { + defer(queue_, &Queue::flow, this, sender.credit()); + } else { + pending_credit_ = sender.credit(); + } +} + +void Sender::on_sender_close(proton::sender &sender) { + if (queue_) { + defer(queue_, &Queue::unsubscribe, this); + } else { + // TODO: Is it possible to be closed before we get the queue allocated? + // If so, we should have a way to mark the sender deleted, so we can delete + // on queue binding + } + senders_.erase(sender); +} + +void Sender::boundQueue(Queue* q, std::string qn) { + DOUT(std::cerr << "Sender: " << this << " bound to Queue: " << q <<"(" << qn << ")\n";); + queue_ = q; + queue_name_ = qn; + + defer(q, &Queue::subscribe, this); + sender_.open(proton::sender_options() + .source((proton::source_options().address(queue_name_))) + .handler(*this)); + if (pending_credit_>0) { + defer(queue_, &Queue::flow, this, pending_credit_); + } + std::cout << "sending from " << queue_name_ << std::endl; +} + +class Receiver : public proton::messaging_handler { + friend class connection_handler; + + proton::receiver receiver_; + proton::event_loop& event_loop_; + Queue* queue_; + std::deque<proton::message> messages_; + + // A message is received. + void on_message(proton::delivery &, proton::message &m) OVERRIDE { + messages_.push_back(m); + + if (queue_) { + queueMsgs(); } } - // If the queue is not empty, pop a message into m and return true. - // Otherwise save callback to be called when there are messages and return false. - // Called from sender's connection. - bool pop(proton::message& m, std::function<void(queue*)> callback) { - std::lock_guard<std::mutex> g(lock_); - if (messages_.empty()) { - callbacks_.push_back(callback); - return false; - } else { - m = std::move(messages_.front()); + void queueMsgs() { + DOUT(std::cerr << "Receiver: " << this << " queueing " << messages_.size() << " msgs to: " << queue_ << "\n";); + while (!messages_.empty()) { + defer(queue_, &Queue::queueMsg, messages_.front()); messages_.pop_front(); - return true; } } - private: - const std::string name_; - std::mutex lock_; - std::deque<proton::message> messages_; - std::vector<std::function<void(queue*)> > callbacks_; +public: + Receiver(proton::receiver r) : + receiver_(r), event_loop_(make_thread_safe(r).get()->event_loop()), queue_(0) + {} + + void inject(proton::void_function0& f) { + event_loop_.inject(f); + } + + void boundQueue(Queue* q, std::string qn) { + DOUT(std::cerr << "Receiver: " << this << " bound to Queue: " << q << "(" << qn << ")\n";); + queue_ = q; + receiver_.open(proton::receiver_options() + .source((proton::source_options().address(qn))) + .handler(*this)); + std::cout << "receiving to " << qn << std::endl; + + queueMsgs(); + } }; -/// Thread safe map of queues. -class queues { - public: - queues() : next_id_(0) {} +class QueueManager { + proton::container& container_; + proton::event_loop event_loop_; + typedef std::map<std::string, Queue*> queues; + queues queues_; + int next_id_; // Use to generate unique queue IDs. - // Get or create the named queue. - queue* get(const std::string& name) { - std::lock_guard<std::mutex> g(lock_); - auto i = queues_.insert(queue_map::value_type(name, nullptr)).first; - if (!i->second) - i->second.reset(new queue(name)); - return i->second.get(); +public: + QueueManager(proton::container& c) : + container_(c), event_loop_(c), next_id_(0) + {} + + void inject(proton::void_function0& f) { + event_loop_.inject(f); } - // Create a dynamic queue with a unique name. - queue* dynamic() { - std::ostringstream os; - os << "_dynamic_" << next_id_++; - return get(os.str()); + template <class T> + void findQueue(T& connection, std::string& qn) { + if (qn.empty()) { + // Dynamic queue creation + std::ostringstream os; + os << "_dynamic_" << next_id_++; + qn = os.str(); + } + Queue* q = 0; + queues::iterator i = queues_.find(qn); + if (i==queues_.end()) { + q = new Queue(container_, qn); + queues_[qn] = q; + } else { + q = i->second; + } + defer(&connection, &T::boundQueue, q, qn); } - private: - typedef std::map<std::string, std::unique_ptr<queue> > queue_map; + void findQueueSender(Sender* s, std::string qn) { + findQueue(*s, qn); + } - std::mutex lock_; - queue_map queues_; - std::atomic<int> next_id_; // Use to generate unique queue IDs. + void findQueueReceiver(Receiver* r, std::string qn) { + findQueue(*r, qn); + } }; -/// Broker connection handler. Things to note: -/// -/// 1. Each handler manages a single connection. -/// -/// 2. For a *single connection* calls to proton::handler functions and calls to -/// function objects passed to proton::event_loop::inject() are serialized, -/// i.e. never called concurrently. Handlers can have per-connection state -/// without needing locks. -/// -/// 3. Handler/injected functions for *different connections* can be called -/// concurrently. Resources used by multiple connections (e.g. the queues in -/// this example) must be thread-safe. -/// -/// 4. You can 'inject' work to be done sequentially using a connection's -/// proton::event_loop. In this example, we create a std::function callback -/// that we pass to queues, so they can notify us when they have messages. -/// -class broker_connection_handler : public proton::messaging_handler { - public: - broker_connection_handler(queues& qs) : queues_(qs) {} +class connection_handler : public proton::messaging_handler { + QueueManager& queue_manager_; + senders senders_; - void on_connection_open(proton::connection& c) OVERRIDE { - // Create the has_messages callback for queue subscriptions. - // - // Make a std::shared_ptr to a thread_safe handle for our proton::connection. - // The connection's proton::event_loop will remain valid as a shared_ptr exists. - std::shared_ptr<proton::thread_safe<proton::connection> > ts_c = make_shared_thread_safe(c); - - // Make a lambda function to inject a call to this->has_messages() via the proton::event_loop. - // The function is bound to a shared_ptr so this is safe. If the connection has already closed - // proton::event_loop::inject() will drop the callback. - has_messages_callback_ = [this, ts_c](queue* q) mutable { - ts_c->event_loop().inject( - std::bind(&broker_connection_handler::has_messages, this, q)); - }; +public: + connection_handler(QueueManager& qm) : + queue_manager_(qm) + {} + void on_connection_open(proton::connection& c) OVERRIDE { c.open(); // Accept the connection } // A sender sends messages from a queue to a subscriber. void on_sender_open(proton::sender &sender) OVERRIDE { - queue *q = sender.source().dynamic() ? - queues_.dynamic() : queues_.get(sender.source().address()); - sender.open(proton::sender_options().source((proton::source_options().address(q->name())))); - std::cout << "sending from " << q->name() << std::endl; - } - - // We have credit to send a message. - void on_sendable(proton::sender &s) OVERRIDE { - queue* q = sender_queue(s); - if (!do_send(q, s)) // Queue is empty, save ourselves in the blocked set. - blocked_.insert(std::make_pair(q, s)); + std::string qn = sender.source().dynamic() ? "" : sender.source().address(); + Sender* s = new Sender(sender, senders_); + senders_[sender] = s; + defer(&queue_manager_, &QueueManager::findQueueSender, s, qn); } // A receiver receives messages from a publisher to a queue. - void on_receiver_open(proton::receiver &r) OVERRIDE { - std::string qname = r.target().address(); + void on_receiver_open(proton::receiver &receiver) OVERRIDE { + std::string qname = receiver.target().address(); if (qname == "shutdown") { std::cout << "broker shutting down" << std::endl; // Sending to the special "shutdown" queue stops the broker. - r.connection().container().stop( + receiver.connection().container().stop( proton::error_condition("shutdown", "stop broker")); } else { - r.open(proton::receiver_options().target(proton::target_options().address(qname))); - std::cout << "receiving to " << qname << std::endl; + if (qname.empty()) { + DOUT(std::cerr << "ODD - trying to attach to a empty address\n";); + } + Receiver* r = new Receiver(receiver); + defer(&queue_manager_, &QueueManager::findQueueReceiver, r, qname); } } - // A message is received. - void on_message(proton::delivery &d, proton::message &m) OVERRIDE { - std::string qname = d.receiver().target().address(); - queues_.get(qname)->push(m); - } - void on_session_close(proton::session &session) OVERRIDE { - // Erase all blocked senders that belong to session. - auto predicate = [session](const proton::sender& s) { - return s.session() == session; - }; - erase_sender_if(blocked_.begin(), blocked_.end(), predicate); - } - - void on_sender_close(proton::sender &sender) OVERRIDE { - // Erase sender from the blocked set. - auto range = blocked_.equal_range(sender_queue(sender)); - auto predicate = [sender](const proton::sender& s) { return s == sender; }; - erase_sender_if(range.first, range.second, predicate); + // Unsubscribe all senders that belong to session. + for (proton::sender_iterator i = session.senders().begin(); i != session.senders().end(); ++i) { + senders::iterator j = senders_.find(*i); + if (j == senders_.end()) continue; + Sender* s = j->second; + if (s->queue_) { + defer(s->queue_, &Queue::unsubscribe, s); + } + senders_.erase(j); + } } void on_error(const proton::error_condition& e) OVERRIDE { std::cerr << "error: " << e.what() << std::endl; } - // The container calls on_transport_close() last. - void on_transport_close(proton::transport&) OVERRIDE { - delete this; // All done. - } - private: - typedef std::multimap<queue*, proton::sender> blocked_map; - - // Get the queue associated with a sender. - queue* sender_queue(const proton::sender& s) { - return queues_.get(s.source().address()); // Thread safe. - } - - // Only called if we have credit. Return true if we sent a message. - bool do_send(queue* q, proton::sender &s) { - proton::message m; - bool popped = q->pop(m, has_messages_callback_); - if (popped) - s.send(m); - /// if !popped the queue has saved the callback for later. - return popped; - } - - // Called via the connection's proton::event_loop when q has messages. - // Try all the blocked senders. - void has_messages(queue* q) { - auto range = blocked_.equal_range(q); - for (auto i = range.first; i != range.second;) { - if (i->second.credit() <= 0 || do_send(q, i->second)) - i = blocked_.erase(i); // No credit or send was successful, stop blocked. - else - ++i; // have credit, didn't send, keep blocked - } - } - - // Use to erase closed senders from blocked_ set. - template <class Predicate> - void erase_sender_if(blocked_map::iterator begin, blocked_map::iterator end, Predicate p) { - for (auto i = begin; i != end; ) { - if (p(i->second)) - i = blocked_.erase(i); - else - ++i; + // The container calls on_transport_close() last. + void on_transport_close(proton::transport& t) OVERRIDE { + // Unsubscribe all senders. + for (proton::sender_iterator i = t.connection().senders().begin(); i != t.connection().senders().end(); ++i) { + senders::iterator j = senders_.find(*i); + if (j == senders_.end()) continue; + Sender* s = j->second; + if (s->queue_) { + defer(s->queue_, &Queue::unsubscribe, s); + } } + delete this; // All done. } - - queues& queues_; - blocked_map blocked_; - std::function<void(queue*)> has_messages_callback_; - proton::connection connection_; }; - class broker { public: broker(const std::string addr) : - container_("mt_broker"), listener_(queues_) + container_("broker"), queues_(container_), listener_(queues_) { container_.listen(addr, listener_); std::cout << "broker listening on " << addr << std::endl; @@ -279,21 +478,21 @@ class broker { private: struct listener : public proton::listen_handler { - listener(queues& qs) : queues_(qs) {} + listener(QueueManager& c) : queues_(c) {} proton::connection_options on_accept(proton::listener&) OVERRIDE{ - return proton::connection_options().handler(*(new broker_connection_handler(queues_))); + return proton::connection_options().handler(*(new connection_handler(queues_))); } void on_error(proton::listener&, const std::string& s) OVERRIDE { std::cerr << "listen error: " << s << std::endl; throw std::runtime_error(s); } - queues& queues_; + QueueManager& queues_; }; - queues queues_; proton::container container_; + QueueManager queues_; listener listener_; }; @@ -302,9 +501,11 @@ int main(int argc, char **argv) { std::string address("0.0.0.0"); example::options opts(argc, argv); + opts.add_flag(verbose, 'v', "verbose", "verbose (debugging) output"); opts.add_value(address, 'a', "address", "listen on URL", "URL"); try { + verbose = false; opts.parse(); broker(address).run(); return 0; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
