PROTON-1400: [C++ example] Rework broker to use container event loops

Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/2b2666b2
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/2b2666b2
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/2b2666b2

Branch: refs/heads/master
Commit: 2b2666b2758c5d493d07441a3eaad4a52d2bfbe1
Parents: 570d0a1
Author: Andrew Stitcher <[email protected]>
Authored: Tue Mar 28 13:45:28 2017 -0400
Committer: Andrew Stitcher <[email protected]>
Committed: Fri Jul 21 12:50:06 2017 -0400

----------------------------------------------------------------------
 examples/cpp/broker.cpp | 553 +++++++++++++++++++++++++++++--------------
 1 file changed, 377 insertions(+), 176 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2b2666b2/examples/cpp/broker.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/broker.cpp b/examples/cpp/broker.cpp
index 5b0982b..2fbe077 100644
--- a/examples/cpp/broker.cpp
+++ b/examples/cpp/broker.cpp
@@ -25,6 +25,7 @@
 #include <proton/default_container.hpp>
 #include <proton/delivery.hpp>
 #include <proton/error_condition.hpp>
+#include <proton/function.hpp>
 #include <proton/listen_handler.hpp>
 #include <proton/listener.hpp>
 #include <proton/message.hpp>
@@ -36,238 +37,436 @@
 #include <proton/target_options.hpp>
 #include <proton/thread_safe.hpp>
 #include <proton/tracker.hpp>
+#include <proton/transport.hpp>
 
-#include <atomic>
 #include <deque>
-#include <functional>
 #include <iostream>
 #include <map>
-#include <mutex>
 #include <string>
 
 #include "fake_cpp11.hpp"
 
-// Thread safe queue.
-// Stores messages, notifies subscribed connections when there is data.
-class queue {
-  public:
-    queue(const std::string& name) : name_(name) {}
+// This is a simplified model for a message broker, that only allows for 
messages to go to a
+// single receiver.
+//
+// Queues are only created and never destroyed
+//
+// Broker Entities (that need to be individually serialised)
+// QueueManager - Creates new queues, finds queues
+// Queue        - Queues msgs, records subscribers, sends msgs to subscribers
+// Connection   - Receives Messages from network, sends messages to network.
+
+// Work
+// FindQueue(queueName, connection) - From a Connection to the QueueManager
+//     This will create the queue if it doesn't already exist and send a 
BoundQueue
+//     message back to the connection.
+// BoundQueue(queue) - From the QueueManager to a Connection
+//
+// QueueMsg(msg)        - From a Connection (receiver) to a Queue
+// Subscribe(sender)    - From a Connection (sender) to a Queue
+// Flow(sender, credit) - From a Connection (sender) to a Queue
+// Unsubscribe(sender)  - From a Connection (sender) to a Queue
+//
+// SendMsg(msg)   - From a Queue to a Connection (sender)
+// Unsubscribed() - From a Queue to a Connection (sender)
+
+// Utilities to make injecting member functions palatable in C++03
+template <class T>
+struct work0 : public proton::void_function0 {
+    T& holder_;
+    void (T::* fn_)();
+
+    work0(T& h, void (T::* a)()) :
+        holder_(h), fn_(a) {}
+
+    void operator()() OVERRIDE {
+        (holder_.*fn_)();
+        delete this;
+    }
+};
+
+template <class T, class A>
+struct work1 : public proton::void_function0 {
+    T& holder_;
+    void (T::* fn_)(A);
+    A a_;
+
+    work1(T& h, void (T::* t)(A), A a) :
+        holder_(h), fn_(t), a_(a) {}
+
+    void operator()() OVERRIDE {
+        (holder_.*fn_)(a_);
+        delete this;
+    }
+};
+
+template <class T, class A, class B>
+struct work2 : public proton::void_function0 {
+    T& holder_;
+    void (T::* fn_)(A, B);
+    A a_;
+    B b_;
+
+    work2(T& h, void (T::* t)(A, B), A a, B b) :
+        holder_(h), fn_(t), a_(a), b_(b) {}
+
+    void operator()() OVERRIDE {
+        (holder_.*fn_)(a_, b_);
+        delete this;
+    }
+};
+
+template <class T, class A, class B, class C>
+struct work3 : public proton::void_function0 {
+    T& holder_;
+    void (T::* fn_)(A, B, C);
+    A a_;
+    B b_;
+    C c_;
+
+    work3(T& h, void (T::* t)(A, B, C), A a, B b, C c) :
+        holder_(h), fn_(t), a_(a), b_(b), c_(c) {}
+
+    void operator()() OVERRIDE {
+        (holder_.*fn_)(a_, b_, c_);
+        delete this;
+    }
+};
+
+template <class T>
+void defer(T* t, void (T::*f)()) {
+    work0<T>* w = new work0<T>(*t, f);
+    t->inject(*w);
+}
+
+template <class T, class A>
+void defer(T* t, void (T::*f)(A), A a) {
+    work1<T, A>* w = new work1<T, A>(*t, f, a);
+    t->inject(*w);
+}
+
+template <class T, class A, class B>
+void defer(T* t, void (T::*f)(A, B), A a, B b) {
+    work2<T, A, B>* w = new work2<T, A, B>(*t, f, a, b);
+    t->inject(*w);
+}
+
+template <class T, class A, class B, class C>
+void defer(T* t, void (T::*f)(A, B, C), A a, B b, C c) {
+    work3<T, A, B, C>* w = new work3<T, A, B, C>(*t, f, a, b, c);
+    t->inject(*w);
+}
+
+// Simple debug output
+bool verbose;
+#define DOUT(x) do {if (verbose) {x};} while (false)
+
+class Queue;
+class Sender;
 
-    std::string name() const { return name_; }
+typedef std::map<proton::sender, Sender*> senders;
 
-    // Push a message onto the queue.
-    // If the queue was previously empty, notify subscribers it has messages.
-    // Called from receiver's connection.
-    void push(const proton::message &m) {
-        std::lock_guard<std::mutex> g(lock_);
+class Sender : public proton::messaging_handler {
+    friend class connection_handler;
+
+    proton::sender sender_;
+    senders& senders_;
+    proton::event_loop& event_loop_;
+    std::string queue_name_;
+    Queue* queue_;
+    int pending_credit_;
+
+    // Messaging handlers
+    void on_sendable(proton::sender &sender) OVERRIDE;
+    void on_sender_close(proton::sender &sender) OVERRIDE;
+
+public:
+    Sender(proton::sender s, senders& ss) :
+        sender_(s), senders_(ss), 
event_loop_(make_thread_safe(s).get()->event_loop()), queue_(0), 
pending_credit_(0)
+    {}
+
+    void inject(proton::void_function0& f) {
+        event_loop_.inject(f);
+    }
+
+
+    void boundQueue(Queue* q, std::string qn);
+    void sendMsg(proton::message m) {
+        DOUT(std::cerr << "Sender:   " << this << " sending\n";);
+        sender_.send(m);
+    }
+    void unsubscribed() {
+        DOUT(std::cerr << "Sender:   " << this << " deleting\n";);
+        delete this;
+    }
+};
+
+// Queue - round robin subscriptions
+class Queue {
+    proton::event_loop event_loop_;
+    const std::string name_;
+    std::deque<proton::message> messages_;
+    typedef std::map<Sender*, int> subscriptions; // With credit
+    subscriptions subscriptions_;
+    subscriptions::iterator current_;
+
+    void tryToSend() {
+        DOUT(std::cerr << "Queue:    " << this << " tryToSend: " << 
subscriptions_.size(););
+        // Starting at current_, send messages to subscriptions with credit:
+        // After each send try to find another subscription; Wrap around;
+        // Finish when we run out of messages or credit.
+        size_t outOfCredit = 0;
+        while (!messages_.empty() && outOfCredit<subscriptions_.size()) {
+            // If we got the end (or haven't started yet) start at the 
beginning
+            if (current_==subscriptions_.end()) {
+                current_=subscriptions_.begin();
+            }
+            // If we have credit send the message
+            DOUT(std::cerr << "(" << current_->second << ") ";);
+            if (current_->second>0) {
+                DOUT(std::cerr << current_->first << " ";);
+                defer(current_->first, &Sender::sendMsg, messages_.front());
+                messages_.pop_front();
+                --current_->second;
+                ++current_;
+            } else {
+                ++outOfCredit;
+            }
+        }
+        DOUT(std::cerr << "\n";);
+    }
+
+public:
+    Queue(proton::container& c, const std::string& n) :
+        event_loop_(c), name_(n), current_(subscriptions_.end())
+    {}
+
+    void inject(proton::void_function0& f) {
+        event_loop_.inject(f);
+    }
+
+    void queueMsg(proton::message m) {
+        DOUT(std::cerr << "Queue:    " << this << "(" << name_ << ") 
queueMsg\n";);
         messages_.push_back(m);
-        if (messages_.size() == 1) { // Non-empty, notify subscribers
-            for (auto cb : callbacks_)
-                cb(this);
-            callbacks_.clear();
+        tryToSend();
+    }
+    void flow(Sender* s, int c) {
+        DOUT(std::cerr << "Queue:    " << this << "(" << name_ << ") flow: " 
<< c << " to " << s << "\n";);
+        subscriptions_[s] = c;
+        tryToSend();
+    }
+    void subscribe(Sender* s) {
+        DOUT(std::cerr << "Queue:    " << this << "(" << name_ << ") subscribe 
Sender: " << s << "\n";);
+        subscriptions_[s] = 0;
+    }
+    void unsubscribe(Sender* s) {
+        DOUT(std::cerr << "Queue:    " << this << "(" << name_ << ") 
unsubscribe Sender: " << s << "\n";);
+        // If we're about to erase the current subscription move on
+        if (current_ != subscriptions_.end() && current_->first==s) ++current_;
+        subscriptions_.erase(s);
+        defer(s, &Sender::unsubscribed);
+    }
+};
+
+// We have credit to send a message.
+void Sender::on_sendable(proton::sender &sender) {
+    if (queue_) {
+        defer(queue_, &Queue::flow, this, sender.credit());
+    } else {
+        pending_credit_ = sender.credit();
+    }
+}
+
+void Sender::on_sender_close(proton::sender &sender) {
+    if (queue_) {
+        defer(queue_, &Queue::unsubscribe, this);
+    } else {
+        // TODO: Is it possible to be closed before we get the queue allocated?
+        // If so, we should have a way to mark the sender deleted, so we can 
delete
+        // on queue binding
+    }
+    senders_.erase(sender);
+}
+
+void Sender::boundQueue(Queue* q, std::string qn) {
+    DOUT(std::cerr << "Sender:   " << this << " bound to Queue: " << q <<"(" 
<< qn << ")\n";);
+    queue_ = q;
+    queue_name_ = qn;
+
+    defer(q, &Queue::subscribe, this);
+    sender_.open(proton::sender_options()
+        .source((proton::source_options().address(queue_name_)))
+        .handler(*this));
+    if (pending_credit_>0) {
+        defer(queue_, &Queue::flow, this, pending_credit_);
+    }
+    std::cout << "sending from " << queue_name_ << std::endl;
+}
+
+class Receiver : public proton::messaging_handler {
+    friend class connection_handler;
+
+    proton::receiver receiver_;
+    proton::event_loop& event_loop_;
+    Queue* queue_;
+    std::deque<proton::message> messages_;
+
+    // A message is received.
+    void on_message(proton::delivery &, proton::message &m) OVERRIDE {
+        messages_.push_back(m);
+
+        if (queue_) {
+            queueMsgs();
         }
     }
 
-    // If the queue is not empty, pop a message into m and return true.
-    // Otherwise save callback to be called when there are messages and return 
false.
-    // Called from sender's connection.
-    bool pop(proton::message& m, std::function<void(queue*)> callback) {
-        std::lock_guard<std::mutex> g(lock_);
-        if (messages_.empty()) {
-            callbacks_.push_back(callback);
-            return false;
-        } else {
-            m = std::move(messages_.front());
+    void queueMsgs() {
+        DOUT(std::cerr << "Receiver: " << this << " queueing " << 
messages_.size() << " msgs to: " << queue_ << "\n";);
+        while (!messages_.empty()) {
+            defer(queue_, &Queue::queueMsg, messages_.front());
             messages_.pop_front();
-            return true;
         }
     }
 
-  private:
-    const std::string name_;
-    std::mutex lock_;
-    std::deque<proton::message> messages_;
-    std::vector<std::function<void(queue*)> > callbacks_;
+public:
+    Receiver(proton::receiver r) :
+        receiver_(r), event_loop_(make_thread_safe(r).get()->event_loop()), 
queue_(0)
+    {}
+
+    void inject(proton::void_function0& f) {
+        event_loop_.inject(f);
+    }
+
+    void boundQueue(Queue* q, std::string qn) {
+        DOUT(std::cerr << "Receiver: " << this << " bound to Queue: " << q << 
"(" << qn << ")\n";);
+        queue_ = q;
+        receiver_.open(proton::receiver_options()
+            .source((proton::source_options().address(qn)))
+            .handler(*this));
+        std::cout << "receiving to " << qn << std::endl;
+
+        queueMsgs();
+    }
 };
 
-/// Thread safe map of queues.
-class queues {
-  public:
-    queues() : next_id_(0) {}
+class QueueManager {
+    proton::container& container_;
+    proton::event_loop event_loop_;
+    typedef std::map<std::string, Queue*> queues;
+    queues queues_;
+    int next_id_; // Use to generate unique queue IDs.
 
-    // Get or create the named queue.
-    queue* get(const std::string& name) {
-        std::lock_guard<std::mutex> g(lock_);
-        auto i = queues_.insert(queue_map::value_type(name, nullptr)).first;
-        if (!i->second)
-            i->second.reset(new queue(name));
-        return i->second.get();
+public:
+    QueueManager(proton::container& c) :
+        container_(c), event_loop_(c), next_id_(0)
+    {}
+
+    void inject(proton::void_function0& f) {
+        event_loop_.inject(f);
     }
 
-    // Create a dynamic queue with a unique name.
-    queue* dynamic() {
-        std::ostringstream os;
-        os << "_dynamic_" << next_id_++;
-        return get(os.str());
+    template <class T>
+    void findQueue(T& connection, std::string& qn) {
+        if (qn.empty()) {
+            // Dynamic queue creation
+            std::ostringstream os;
+            os << "_dynamic_" << next_id_++;
+            qn = os.str();
+        }
+        Queue* q = 0;
+        queues::iterator i = queues_.find(qn);
+        if (i==queues_.end()) {
+            q = new Queue(container_, qn);
+            queues_[qn] = q;
+        } else {
+            q = i->second;
+        }
+        defer(&connection, &T::boundQueue, q, qn);
     }
 
-  private:
-    typedef std::map<std::string, std::unique_ptr<queue> > queue_map;
+    void findQueueSender(Sender* s, std::string qn) {
+        findQueue(*s, qn);
+    }
 
-    std::mutex lock_;
-    queue_map queues_;
-    std::atomic<int> next_id_; // Use to generate unique queue IDs.
+    void findQueueReceiver(Receiver* r, std::string qn) {
+        findQueue(*r, qn);
+    }
 };
 
-/// Broker connection handler. Things to note:
-///
-/// 1. Each handler manages a single connection.
-///
-/// 2. For a *single connection* calls to proton::handler functions and calls 
to
-/// function objects passed to proton::event_loop::inject() are serialized,
-/// i.e. never called concurrently. Handlers can have per-connection state
-/// without needing locks.
-///
-/// 3. Handler/injected functions for *different connections* can be called
-/// concurrently. Resources used by multiple connections (e.g. the queues in
-/// this example) must be thread-safe.
-///
-/// 4. You can 'inject' work to be done sequentially using a connection's
-/// proton::event_loop. In this example, we create a std::function callback
-/// that we pass to queues, so they can notify us when they have messages.
-///
-class broker_connection_handler : public proton::messaging_handler {
-  public:
-    broker_connection_handler(queues& qs) : queues_(qs) {}
+class connection_handler : public proton::messaging_handler {
+    QueueManager& queue_manager_;
+    senders senders_;
 
-    void on_connection_open(proton::connection& c) OVERRIDE {
-        // Create the has_messages callback for queue subscriptions.
-        //
-        // Make a std::shared_ptr to a thread_safe handle for our 
proton::connection.
-        // The connection's proton::event_loop will remain valid as a 
shared_ptr exists.
-        std::shared_ptr<proton::thread_safe<proton::connection> > ts_c = 
make_shared_thread_safe(c);
-
-        // Make a lambda function to inject a call to this->has_messages() via 
the proton::event_loop.
-        // The function is bound to a shared_ptr so this is safe. If the 
connection has already closed
-        // proton::event_loop::inject() will drop the callback.
-        has_messages_callback_ = [this, ts_c](queue* q) mutable {
-            ts_c->event_loop().inject(
-                std::bind(&broker_connection_handler::has_messages, this, q));
-        };
+public:
+    connection_handler(QueueManager& qm) :
+        queue_manager_(qm)
+    {}
 
+    void on_connection_open(proton::connection& c) OVERRIDE {
         c.open();            // Accept the connection
     }
 
     // A sender sends messages from a queue to a subscriber.
     void on_sender_open(proton::sender &sender) OVERRIDE {
-        queue *q = sender.source().dynamic() ?
-            queues_.dynamic() : queues_.get(sender.source().address());
-        
sender.open(proton::sender_options().source((proton::source_options().address(q->name()))));
-        std::cout << "sending from " << q->name() << std::endl;
-    }
-
-    // We have credit to send a message.
-    void on_sendable(proton::sender &s) OVERRIDE {
-        queue* q = sender_queue(s);
-        if (!do_send(q, s))     // Queue is empty, save ourselves in the 
blocked set.
-            blocked_.insert(std::make_pair(q, s));
+        std::string qn = sender.source().dynamic() ? "" : 
sender.source().address();
+        Sender* s = new Sender(sender, senders_);
+        senders_[sender] = s;
+        defer(&queue_manager_, &QueueManager::findQueueSender, s, qn);
     }
 
     // A receiver receives messages from a publisher to a queue.
-    void on_receiver_open(proton::receiver &r) OVERRIDE {
-        std::string qname = r.target().address();
+    void on_receiver_open(proton::receiver &receiver) OVERRIDE {
+        std::string qname = receiver.target().address();
         if (qname == "shutdown") {
             std::cout << "broker shutting down" << std::endl;
             // Sending to the special "shutdown" queue stops the broker.
-            r.connection().container().stop(
+            receiver.connection().container().stop(
                 proton::error_condition("shutdown", "stop broker"));
         } else {
-            
r.open(proton::receiver_options().target(proton::target_options().address(qname)));
-            std::cout << "receiving to " << qname << std::endl;
+            if (qname.empty()) {
+                DOUT(std::cerr << "ODD - trying to attach to a empty 
address\n";);
+            }
+            Receiver* r = new Receiver(receiver);
+            defer(&queue_manager_, &QueueManager::findQueueReceiver, r, qname);
         }
     }
 
-    // A message is received.
-    void on_message(proton::delivery &d, proton::message &m) OVERRIDE {
-        std::string qname = d.receiver().target().address();
-        queues_.get(qname)->push(m);
-    }
-
     void on_session_close(proton::session &session) OVERRIDE {
-        // Erase all blocked senders that belong to session.
-        auto predicate = [session](const proton::sender& s) {
-            return s.session() == session;
-        };
-        erase_sender_if(blocked_.begin(), blocked_.end(), predicate);
-    }
-
-    void on_sender_close(proton::sender &sender) OVERRIDE {
-        // Erase sender from the blocked set.
-        auto range = blocked_.equal_range(sender_queue(sender));
-        auto predicate = [sender](const proton::sender& s) { return s == 
sender; };
-        erase_sender_if(range.first, range.second, predicate);
+        // Unsubscribe all senders that belong to session.
+        for (proton::sender_iterator i = session.senders().begin(); i != 
session.senders().end(); ++i) {
+            senders::iterator j = senders_.find(*i);
+            if (j == senders_.end()) continue;
+            Sender* s = j->second;
+            if (s->queue_) {
+                defer(s->queue_, &Queue::unsubscribe, s);
+            }
+            senders_.erase(j);
+        }
     }
 
     void on_error(const proton::error_condition& e) OVERRIDE {
         std::cerr << "error: " << e.what() << std::endl;
     }
-    // The container calls on_transport_close() last.
-    void on_transport_close(proton::transport&) OVERRIDE {
-        delete this;            // All done.
-    }
 
-  private:
-    typedef std::multimap<queue*, proton::sender> blocked_map;
-
-    // Get the queue associated with a sender.
-    queue* sender_queue(const proton::sender& s) {
-        return queues_.get(s.source().address()); // Thread safe.
-    }
-
-    // Only called if we have credit. Return true if we sent a message.
-    bool do_send(queue* q, proton::sender &s) {
-        proton::message m;
-        bool popped =  q->pop(m, has_messages_callback_);
-        if (popped)
-            s.send(m);
-        /// if !popped the queue has saved the callback for later.
-        return popped;
-    }
-
-    // Called via the connection's proton::event_loop when q has messages.
-    // Try all the blocked senders.
-    void has_messages(queue* q) {
-        auto range = blocked_.equal_range(q);
-        for (auto i = range.first; i != range.second;) {
-            if (i->second.credit() <= 0 || do_send(q, i->second))
-                i = blocked_.erase(i); // No credit or send was successful, 
stop blocked.
-            else
-                ++i;            // have credit, didn't send, keep blocked
-        }
-    }
-
-    // Use to erase closed senders from blocked_ set.
-    template <class Predicate>
-    void erase_sender_if(blocked_map::iterator begin, blocked_map::iterator 
end, Predicate p) {
-        for (auto i = begin; i != end; ) {
-            if (p(i->second))
-                i = blocked_.erase(i);
-            else
-                ++i;
+    // The container calls on_transport_close() last.
+    void on_transport_close(proton::transport& t) OVERRIDE {
+        // Unsubscribe all senders.
+        for (proton::sender_iterator i = t.connection().senders().begin(); i 
!= t.connection().senders().end(); ++i) {
+            senders::iterator j = senders_.find(*i);
+            if (j == senders_.end()) continue;
+            Sender* s = j->second;
+            if (s->queue_) {
+                defer(s->queue_, &Queue::unsubscribe, s);
+            }
         }
+        delete this;            // All done.
     }
-
-    queues& queues_;
-    blocked_map blocked_;
-    std::function<void(queue*)> has_messages_callback_;
-    proton::connection connection_;
 };
 
-
 class broker {
   public:
     broker(const std::string addr) :
-        container_("mt_broker"), listener_(queues_)
+        container_("broker"), queues_(container_), listener_(queues_)
     {
         container_.listen(addr, listener_);
         std::cout << "broker listening on " << addr << std::endl;
@@ -279,21 +478,21 @@ class broker {
 
   private:
     struct listener : public proton::listen_handler {
-        listener(queues& qs) : queues_(qs) {}
+        listener(QueueManager& c) : queues_(c) {}
 
         proton::connection_options on_accept(proton::listener&) OVERRIDE{
-            return proton::connection_options().handler(*(new 
broker_connection_handler(queues_)));
+            return proton::connection_options().handler(*(new 
connection_handler(queues_)));
         }
 
         void on_error(proton::listener&, const std::string& s) OVERRIDE {
             std::cerr << "listen error: " << s << std::endl;
             throw std::runtime_error(s);
         }
-        queues& queues_;
+        QueueManager& queues_;
     };
 
-    queues queues_;
     proton::container container_;
+    QueueManager queues_;
     listener listener_;
 };
 
@@ -302,9 +501,11 @@ int main(int argc, char **argv) {
     std::string address("0.0.0.0");
     example::options opts(argc, argv);
 
+    opts.add_flag(verbose, 'v', "verbose", "verbose (debugging) output");
     opts.add_value(address, 'a', "address", "listen on URL", "URL");
 
     try {
+        verbose = false;
         opts.parse();
         broker(address).run();
         return 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to