PROTON-1400: WIP Use the mt broker example as the example instead of the 
previous st broker
- The st broker didn't correctly respect the object access constraints from 
within handlers


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/4eba80ee
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/4eba80ee
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/4eba80ee

Branch: refs/heads/master
Commit: 4eba80ee8527ab3145427803ad3bfdf801d79229
Parents: d168b7b
Author: Andrew Stitcher <[email protected]>
Authored: Tue Jan 24 23:36:03 2017 -0500
Committer: Andrew Stitcher <[email protected]>
Committed: Fri Jul 21 12:50:06 2017 -0400

----------------------------------------------------------------------
 examples/cpp/broker.cpp             | 352 +++++++++++---------
 examples/cpp/broker.hpp             | 236 --------------
 examples/cpp/mt/broker.cpp          | 318 ------------------
 examples/cpp/mt/epoll_container.cpp | 541 -------------------------------
 examples/cpp/mt/mt_container.hpp    |  29 --
 5 files changed, 190 insertions(+), 1286 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4eba80ee/examples/cpp/broker.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/broker.cpp b/examples/cpp/broker.cpp
index 97ef206..e47a2a6 100644
--- a/examples/cpp/broker.cpp
+++ b/examples/cpp/broker.cpp
@@ -1,5 +1,4 @@
 /*
- *
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -16,271 +15,300 @@
  * KIND, either express or implied.  See the License for the
  * specific language governing permissions and limitations
  * under the License.
- *
  */
 
 #include "options.hpp"
 
 #include <proton/connection.hpp>
+#include <proton/connection_options.hpp>
 #include <proton/container.hpp>
 #include <proton/default_container.hpp>
 #include <proton/delivery.hpp>
 #include <proton/error_condition.hpp>
+#include <proton/listen_handler.hpp>
 #include <proton/listener.hpp>
-#include <proton/messaging_handler.hpp>
 #include <proton/message.hpp>
-#include <proton/receiver_options.hpp>
-#include <proton/sender.hpp>
+#include <proton/messaging_handler.hpp>
 #include <proton/sender_options.hpp>
 #include <proton/source_options.hpp>
-#include <proton/target_options.hpp>
+#include <proton/target.hpp>
+#include <proton/thread_safe.hpp>
 #include <proton/tracker.hpp>
-#include <proton/transport.hpp>
-#include <proton/url.hpp>
 
+#include <atomic>
 #include <deque>
+#include <functional>
 #include <iostream>
-#include <list>
 #include <map>
+#include <mutex>
 #include <string>
 
 #include "fake_cpp11.hpp"
 
-/// A simple implementation of a queue.
+// Thread safe queue.
+// Stores messages, notifies subscribed connections when there is data.
 class queue {
   public:
-    queue(const std::string &name, bool dynamic = false) : name_(name), 
dynamic_(dynamic) {}
+    queue(const std::string& name) : name_(name) {}
 
     std::string name() const { return name_; }
 
-    void subscribe(proton::sender s) {
-        consumers_.push_back(s);
-    }
-
-    // Return true if queue can be deleted.
-    bool unsubscribe(proton::sender s) {
-        consumers_.remove(s);
-        return (consumers_.size() == 0 && (dynamic_ || messages_.size() == 0));
-    }
-
-    void publish(const proton::message &m) {
+    // Push a message onto the queue.
+    // If the queue was previously empty, notify subscribers it has messages.
+    // Called from receiver's connection.
+    void push(const proton::message &m) {
+        std::lock_guard<std::mutex> g(lock_);
         messages_.push_back(m);
-        dispatch(0);
-    }
-
-    void dispatch(proton::sender *s) {
-        while (deliver_to(s)) {}
-    }
-
-    bool deliver_to(proton::sender *s) {
-        // Deliver to single sender if supplied, else all consumers
-        int count = s ? 1 : consumers_.size();
-
-        if (!count) return false;
-
-        bool result = false;
-        sender_list::iterator it = consumers_.begin();
-
-        if (!s && count) {
-            s = &*it;
+        if (messages_.size() == 1) { // Non-empty, notify subscribers
+            for (auto cb : callbacks_)
+                cb(this);
+            callbacks_.clear();
         }
+    }
 
-        while (messages_.size()) {
-            if (s->credit()) {
-                const proton::message& m = messages_.front();
-
-                s->send(m);
-                messages_.pop_front();
-                result = true;
-            }
-
-            if (--count) {
-                it++;
-            } else {
-                return result;
-            }
+    // If the queue is not empty, pop a message into m and return true.
+    // Otherwise save callback to be called when there are messages and return 
false.
+    // Called from sender's connection.
+    bool pop(proton::message& m, std::function<void(queue*)> callback) {
+        std::lock_guard<std::mutex> g(lock_);
+        if (messages_.empty()) {
+            callbacks_.push_back(callback);
+            return false;
+        } else {
+            m = std::move(messages_.front());
+            messages_.pop_front();
+            return true;
         }
-
-        return false;
     }
 
   private:
-    typedef std::deque<proton::message> message_queue;
-    typedef std::list<proton::sender> sender_list;
-
-    std::string name_;
-    bool dynamic_;
-    message_queue messages_;
-    sender_list consumers_;
+    const std::string name_;
+    std::mutex lock_;
+    std::deque<proton::message> messages_;
+    std::vector<std::function<void(queue*)> > callbacks_;
 };
 
-/// A collection of queues and queue factory, used by a broker.
+/// Thread safe map of queues.
 class queues {
   public:
     queues() : next_id_(0) {}
-    virtual ~queues() {}
 
-    // Get or create a queue.
-    virtual queue &get(const std::string &address) {
-        if (address.empty()) {
-            throw std::runtime_error("empty queue name");
-        }
-
-        queue*& q = queues_[address];
-
-        if (!q) q = new queue(address);
-
-        return *q;
+    // Get or create the named queue.
+    queue* get(const std::string& name) {
+        std::lock_guard<std::mutex> g(lock_);
+        auto i = queues_.insert(queue_map::value_type(name, nullptr)).first;
+        if (!i->second)
+            i->second.reset(new queue(name));
+        return i->second.get();
     }
 
     // Create a dynamic queue with a unique name.
-    virtual queue &dynamic() {
+    queue* dynamic() {
         std::ostringstream os;
-        os << "q" << next_id_++;
-        queue *q = queues_[os.str()] = new queue(os.str(), true);
-
-        return *q;
+        os << "_dynamic_" << next_id_++;
+        return get(os.str());
     }
 
-    // Delete the named queue
-    virtual void erase(std::string &name) {
-        delete queues_[name];
-        queues_.erase(name);
-    }
+  private:
+    typedef std::map<std::string, std::unique_ptr<queue> > queue_map;
 
-  protected:
-    typedef std::map<std::string, queue *> queue_map;
+    std::mutex lock_;
     queue_map queues_;
-    int next_id_; // Use to generate unique queue IDs.
+    std::atomic<int> next_id_; // Use to generate unique queue IDs.
 };
 
-// A handler to implement broker logic
-class broker_handler : public proton::messaging_handler {
+/// Broker connection handler. Things to note:
+///
+/// 1. Each handler manages a single connection.
+///
+/// 2. For a *single connection* calls to proton::handler functions and calls 
to
+/// function objects passed to proton::event_loop::inject() are serialized,
+/// i.e. never called concurrently. Handlers can have per-connection state
+/// without needing locks.
+///
+/// 3. Handler/injected functions for *different connections* can be called
+/// concurrently. Resources used by multiple connections (e.g. the queues in
+/// this example) must be thread-safe.
+///
+/// 4. You can 'inject' work to be done sequentially using a connection's
+/// proton::event_loop. In this example, we create a std::function callback
+/// that we pass to queues, so they can notify us when they have messages.
+///
+class broker_connection_handler : public proton::messaging_handler {
   public:
-    broker_handler(queues& qs) : queues_(qs) {}
+    broker_connection_handler(queues& qs) : queues_(qs) {}
+
+    void on_connection_open(proton::connection& c) OVERRIDE {
+        // Create the has_messages callback for queue subscriptions.
+        //
+        // Make a std::shared_ptr to a thread_safe handle for our 
proton::connection.
+        // The connection's proton::event_loop will remain valid as a 
shared_ptr exists.
+        std::shared_ptr<proton::thread_safe<proton::connection> > ts_c = 
make_shared_thread_safe(c);
+
+        // Make a lambda function to inject a call to this->has_messages() via 
the proton::event_loop.
+        // The function is bound to a shared_ptr so this is safe. If the 
connection has already closed
+        // proton::event_loop::inject() will drop the callback.
+        has_messages_callback_ = [this, ts_c](queue* q) mutable {
+            ts_c->event_loop().inject(
+                std::bind(&broker_connection_handler::has_messages, this, q));
+        };
+
+        c.open();            // Accept the connection
+    }
 
+    // A sender sends messages from a queue to a subscriber.
     void on_sender_open(proton::sender &sender) OVERRIDE {
-        proton::source src(sender.source());
-        queue *q;
-        if (src.dynamic()) {
-            q = &queues_.dynamic();
-        } else if (!src.address().empty()) {
-            q = &queues_.get(src.address());
-        } else {
-            sender.close(proton::error_condition("No queue address supplied"));
-            return;
-        }
-        
sender.open(proton::sender_options().source(proton::source_options().address(q->name())));
-        q->subscribe(sender);
-        std::cout << "broker outgoing link from " << q->name() << std::endl;
+        queue *q = sender.source().dynamic() ?
+            queues_.dynamic() : queues_.get(sender.source().address());
+        
sender.open(proton::sender_options().source((proton::source_options().address(q->name()))));
+        std::cout << "sending from " << q->name() << std::endl;
+    }
+
+    // We have credit to send a message.
+    void on_sendable(proton::sender &s) OVERRIDE {
+        queue* q = sender_queue(s);
+        if (!do_send(q, s))     // Queue is empty, save ourselves in the 
blocked set.
+            blocked_.insert(std::make_pair(q, s));
     }
 
-    void on_receiver_open(proton::receiver &receiver) OVERRIDE {
-        std::string address = receiver.target().address();
-        if (!address.empty()) {
-            
receiver.open(proton::receiver_options().target(proton::target_options().address(address)));
-            std::cout << "broker incoming link to " << address << std::endl;
+    // A receiver receives messages from a publisher to a queue.
+    void on_receiver_open(proton::receiver &r) OVERRIDE {
+        std::string qname = r.target().address();
+        if (qname == "shutdown") {
+            std::cout << "broker shutting down" << std::endl;
+            // Sending to the special "shutdown" queue stops the broker.
+            r.connection().container().stop(
+                proton::error_condition("shutdown", "stop broker"));
         } else {
-            receiver.close(proton::error_condition("No queue address 
supplied"));
+            std::cout << "receiving to " << qname << std::endl;
         }
     }
 
-    void unsubscribe(proton::sender lnk) {
-        std::string address = lnk.source().address();
+    // A message is received.
+    void on_message(proton::delivery &d, proton::message &m) OVERRIDE {
+        std::string qname = d.receiver().target().address();
+        queues_.get(qname)->push(m);
+    }
 
-        if (queues_.get(address).unsubscribe(lnk)) {
-            queues_.erase(address);
-        }
+    void on_session_close(proton::session &session) OVERRIDE {
+        // Erase all blocked senders that belong to session.
+        auto predicate = [session](const proton::sender& s) {
+            return s.session() == session;
+        };
+        erase_sender_if(blocked_.begin(), blocked_.end(), predicate);
     }
 
     void on_sender_close(proton::sender &sender) OVERRIDE {
-        unsubscribe(sender);
+        // Erase sender from the blocked set.
+        auto range = blocked_.equal_range(sender_queue(sender));
+        auto predicate = [sender](const proton::sender& s) { return s == 
sender; };
+        erase_sender_if(range.first, range.second, predicate);
     }
 
-    void on_connection_close(proton::connection &c) OVERRIDE {
-        remove_stale_consumers(c);
+    void on_error(const proton::error_condition& e) OVERRIDE {
+        std::cerr << "error: " << e.what() << std::endl;
     }
-
-    void on_transport_close(proton::transport &t) OVERRIDE {
-        remove_stale_consumers(t.connection());
+    // The container calls on_transport_close() last.
+    void on_transport_close(proton::transport&) OVERRIDE {
+        delete this;            // All done.
     }
 
-    void on_transport_error(proton::transport &t) OVERRIDE {
-        std::cout << "broker client disconnect: " << t.error().what() << 
std::endl;
-    }
+  private:
+    typedef std::multimap<queue*, proton::sender> blocked_map;
 
-    void on_error(const proton::error_condition &c) OVERRIDE {
-        std::cerr << "broker error: " << c.what() << std::endl;
+    // Get the queue associated with a sender.
+    queue* sender_queue(const proton::sender& s) {
+        return queues_.get(s.source().address()); // Thread safe.
     }
 
-    void remove_stale_consumers(proton::connection connection) {
-        proton::sender_range r = connection.senders();
-        for (proton::sender_iterator i = r.begin(); i != r.end(); ++i) {
-            if (i->active())
-                unsubscribe(*i);
-        }
+    // Only called if we have credit. Return true if we sent a message.
+    bool do_send(queue* q, proton::sender &s) {
+        proton::message m;
+        bool popped =  q->pop(m, has_messages_callback_);
+        if (popped)
+            s.send(m);
+        /// if !popped the queue has saved the callback for later.
+        return popped;
     }
 
-    void on_sendable(proton::sender &s) OVERRIDE {
-        std::string address = s.source().address();
-
-        queues_.get(address).dispatch(&s);
+    // Called via the connection's proton::event_loop when q has messages.
+    // Try all the blocked senders.
+    void has_messages(queue* q) {
+        auto range = blocked_.equal_range(q);
+        for (auto i = range.first; i != range.second;) {
+            if (i->second.credit() <= 0 || do_send(q, i->second))
+                i = blocked_.erase(i); // No credit or send was successful, 
stop blocked.
+            else
+                ++i;            // have credit, didn't send, keep blocked
+        }
     }
 
-    void on_message(proton::delivery &d, proton::message &m) OVERRIDE {
-        std::string address = d.receiver().target().address();
-        queues_.get(address).publish(m);
+    // Use to erase closed senders from blocked_ set.
+    template <class Predicate>
+    void erase_sender_if(blocked_map::iterator begin, blocked_map::iterator 
end, Predicate p) {
+        for (auto i = begin; i != end; ) {
+            if (p(i->second))
+                i = blocked_.erase(i);
+            else
+                ++i;
+        }
     }
 
-  protected:
     queues& queues_;
+    blocked_map blocked_;
+    std::function<void(queue*)> has_messages_callback_;
+    proton::connection connection_;
 };
 
 
-// The broker
 class broker {
   public:
-    broker(const std::string& url) : handler_(url, queues_) {}
+    broker(const std::string addr) :
+        container_("mt_broker"), listener_(queues_)
+    {
+        container_.listen(addr, listener_);
+        std::cout << "broker listening on " << addr << std::endl;
+    }
 
-    proton::messaging_handler& handler() { return handler_; }
+    void run() {
+        container_.run(/* std::thread::hardware_concurrency() */);
+    }
 
   private:
-    class my_handler : public broker_handler {
-      public:
-        my_handler(const std::string& u, queues& qs) : broker_handler(qs), 
url_(u) {}
+    struct listener : public proton::listen_handler {
+        listener(queues& qs) : queues_(qs) {}
 
-        void on_container_start(proton::container &c) OVERRIDE {
-            c.listen(url_);
-            std::cout << "broker listening on " << url_ << std::endl;
+        proton::connection_options on_accept(proton::listener&) OVERRIDE{
+            return proton::connection_options().handler(*(new 
broker_connection_handler(queues_)));
         }
 
-      private:
-        const std::string& url_;
+        void on_error(proton::listener&, const std::string& s) OVERRIDE {
+            std::cerr << "listen error: " << s << std::endl;
+            throw std::runtime_error(s);
+        }
+        queues& queues_;
     };
 
-  private:
     queues queues_;
-    my_handler handler_;
+    proton::container container_;
+    listener listener_;
 };
 
 int main(int argc, char **argv) {
-    std::string url("0.0.0.0");
+    // Command line options
+    std::string address("0.0.0.0");
     example::options opts(argc, argv);
 
-    opts.add_value(url, 'a', "address", "listen on URL", "URL");
+    opts.add_value(address, 'a', "address", "listen on URL", "URL");
 
     try {
         opts.parse();
-
-        broker b(url);
-        proton::default_container(b.handler()).run();
-
+        broker(address).run();
         return 0;
     } catch (const example::bad_option& e) {
         std::cout << opts << std::endl << e.what() << std::endl;
     } catch (const std::exception& e) {
-        std::cerr << e.what() << std::endl;
+        std::cerr << "broker shutdown: " << e.what() << std::endl;
     }
-
     return 1;
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4eba80ee/examples/cpp/broker.hpp
----------------------------------------------------------------------
diff --git a/examples/cpp/broker.hpp b/examples/cpp/broker.hpp
deleted file mode 100644
index 953713f..0000000
--- a/examples/cpp/broker.hpp
+++ /dev/null
@@ -1,236 +0,0 @@
-#ifndef BROKER_HPP
-#define BROKER_HPP
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/// @file
-///
-/// Common code used by different broker examples.
-///
-/// The examples add functionality as needed, this helps to make it
-/// easier to see the important differences between the examples.
-
-#include <proton/connection.hpp>
-#include <proton/delivery.hpp>
-#include <proton/messaging_handler.hpp>
-#include <proton/message.hpp>
-#include <proton/sasl.hpp>
-#include <proton/sender.hpp>
-#include <proton/tracker.hpp>
-#include <proton/transport.hpp>
-#include <proton/sender_options.hpp>
-#include <proton/receiver_options.hpp>
-#include <proton/source_options.hpp>
-#include <proton/target_options.hpp>
-
-#include <iostream>
-#include <deque>
-#include <map>
-#include <list>
-#include <sstream>
-
-/// A simple implementation of a queue.
-class queue {
-  public:
-    queue(const std::string &name, bool dynamic = false) : name_(name), 
dynamic_(dynamic) {}
-
-    std::string name() const { return name_; }
-
-    void subscribe(proton::sender s) {
-        consumers_.push_back(s);
-    }
-
-    // Return true if queue can be deleted.
-    bool unsubscribe(proton::sender s) {
-        consumers_.remove(s);
-        return (consumers_.size() == 0 && (dynamic_ || messages_.size() == 0));
-    }
-
-    void publish(const proton::message &m) {
-        messages_.push_back(m);
-        dispatch(0);
-    }
-
-    void dispatch(proton::sender *s) {
-        while (deliver_to(s)) {}
-    }
-
-    bool deliver_to(proton::sender *s) {
-        // Deliver to single sender if supplied, else all consumers
-        int count = s ? 1 : consumers_.size();
-
-        if (!count) return false;
-
-        bool result = false;
-        sender_list::iterator it = consumers_.begin();
-
-        if (!s && count) {
-            s = &*it;
-        }
-
-        while (messages_.size()) {
-            if (s->credit()) {
-                const proton::message& m = messages_.front();
-
-                s->send(m);
-                messages_.pop_front();
-                result = true;
-            }
-
-            if (--count) {
-                it++;
-            } else {
-                return result;
-            }
-        }
-
-        return false;
-    }
-
-  private:
-    typedef std::deque<proton::message> message_queue;
-    typedef std::list<proton::sender> sender_list;
-
-    std::string name_;
-    bool dynamic_;
-    message_queue messages_;
-    sender_list consumers_;
-};
-
-/// A collection of queues and queue factory, used by a broker.
-class queues {
-  public:
-    queues() : next_id_(0) {}
-    virtual ~queues() {}
-
-    // Get or create a queue.
-    virtual queue &get(const std::string &address = std::string()) {
-        if (address.empty()) {
-            throw std::runtime_error("empty queue name");
-        }
-
-        queue*& q = queues_[address];
-
-        if (!q) q = new queue(address);
-
-        return *q;
-    }
-
-    // Create a dynamic queue with a unique name.
-    virtual queue &dynamic() {
-        std::ostringstream os;
-        os << "q" << next_id_++;
-        queue *q = queues_[os.str()] = new queue(os.str(), true);
-
-        return *q;
-    }
-
-    // Delete the named queue
-    virtual void erase(std::string &name) {
-        delete queues_[name];
-        queues_.erase(name);
-    }
-
-  protected:
-    typedef std::map<std::string, queue *> queue_map;
-    queue_map queues_;
-    int next_id_; // Use to generate unique queue IDs.
-};
-
-#include <proton/config.hpp>
-
-/** Common handler logic for brokers. */
-class broker_handler : public proton::messaging_handler {
-  public:
-    broker_handler(queues& qs) : queues_(qs) {}
-
-    void on_transport_open(proton::transport &t) OVERRIDE {
-        std::cout << "Connection from user: " << t.sasl().user() << " 
(mechanism: " << t.sasl().mech() << ")" << std::endl;
-    }
-
-    void on_sender_open(proton::sender &sender) OVERRIDE {
-        proton::source src(sender.source());
-        queue &q = src.dynamic() ?
-            queues_.dynamic() : queues_.get(src.address());
-        
sender.open(proton::sender_options().source(proton::source_options().address(q.name())));
-        q.subscribe(sender);
-        std::cout << "broker outgoing link from " << q.name() << std::endl;
-    }
-
-    void on_receiver_open(proton::receiver &receiver) OVERRIDE {
-        std::string address = receiver.target().address();
-        if (!address.empty()) {
-            
receiver.open(proton::receiver_options().target(proton::target_options().address(address)));
-            std::cout << "broker incoming link to " << address << std::endl;
-        }
-    }
-
-    void unsubscribe(proton::sender lnk) {
-        std::string address = lnk.source().address();
-
-        if (queues_.get(address).unsubscribe(lnk)) {
-            queues_.erase(address);
-        }
-    }
-
-    void on_sender_close(proton::sender &sender) OVERRIDE {
-        unsubscribe(sender);
-    }
-
-    void on_connection_close(proton::connection &c) OVERRIDE {
-        remove_stale_consumers(c);
-    }
-
-    void on_transport_close(proton::transport &t) OVERRIDE {
-        remove_stale_consumers(t.connection());
-    }
-
-    void on_transport_error(proton::transport &t) OVERRIDE {
-        std::cout << "broker client disconnect: " << t.error().what() << 
std::endl;
-    }
-
-    void on_error(const proton::error_condition &c) OVERRIDE {
-        std::cerr << "broker error: " << c.what() << std::endl;
-    }
-
-    void remove_stale_consumers(proton::connection connection) {
-        proton::sender_range sr = connection.senders();
-        for (proton::sender_iterator i = sr.begin(); i != sr.end(); ++i) {
-            if (i->active())
-                unsubscribe(*i);
-        }
-    }
-
-    void on_sendable(proton::sender &s) OVERRIDE {
-        std::string address = s.source().address();
-
-        queues_.get(address).dispatch(&s);
-    }
-
-    void on_message(proton::delivery &d, proton::message &m) OVERRIDE {
-        std::string address = d.receiver().target().address();
-        queues_.get(address).publish(m);
-    }
-
-  protected:
-    queues& queues_;
-};
-
-#endif // BROKER_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4eba80ee/examples/cpp/mt/broker.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/mt/broker.cpp b/examples/cpp/mt/broker.cpp
deleted file mode 100644
index 83b7005..0000000
--- a/examples/cpp/mt/broker.cpp
+++ /dev/null
@@ -1,318 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "../options.hpp"
-#include "mt_container.hpp"
-
-#include <proton/connection.hpp>
-#include <proton/connection_options.hpp>
-#include <proton/container.hpp>
-#include <proton/default_container.hpp>
-#include <proton/delivery.hpp>
-#include <proton/error_condition.hpp>
-#include <proton/listen_handler.hpp>
-#include <proton/listener.hpp>
-#include <proton/message.hpp>
-#include <proton/messaging_handler.hpp>
-#include <proton/sender_options.hpp>
-#include <proton/source_options.hpp>
-#include <proton/target.hpp>
-#include <proton/thread_safe.hpp>
-#include <proton/tracker.hpp>
-
-#include <atomic>
-#include <deque>
-#include <functional>
-#include <iostream>
-#include <map>
-#include <mutex>
-#include <thread>
-
-#include "../fake_cpp11.hpp"
-
-// Thread safe queue.
-// Stores messages, notifies subscribed connections when there is data.
-class queue {
-  public:
-    queue(const std::string& name) : name_(name) {}
-
-    std::string name() const { return name_; }
-
-    // Push a message onto the queue.
-    // If the queue was previously empty, notify subscribers it has messages.
-    // Called from receiver's connection.
-    void push(const proton::message &m) {
-        std::lock_guard<std::mutex> g(lock_);
-        messages_.push_back(m);
-        if (messages_.size() == 1) { // Non-empty, notify subscribers
-            for (auto cb : callbacks_)
-                cb(this);
-            callbacks_.clear();
-        }
-    }
-
-    // If the queue is not empty, pop a message into m and return true.
-    // Otherwise save callback to be called when there are messages and return 
false.
-    // Called from sender's connection.
-    bool pop(proton::message& m, std::function<void(queue*)> callback) {
-        std::lock_guard<std::mutex> g(lock_);
-        if (messages_.empty()) {
-            callbacks_.push_back(callback);
-            return false;
-        } else {
-            m = std::move(messages_.front());
-            messages_.pop_front();
-            return true;
-        }
-    }
-
-  private:
-    const std::string name_;
-    std::mutex lock_;
-    std::deque<proton::message> messages_;
-    std::vector<std::function<void(queue*)> > callbacks_;
-};
-
-/// Thread safe map of queues.
-class queues {
-  public:
-    queues() : next_id_(0) {}
-
-    // Get or create the named queue.
-    queue* get(const std::string& name) {
-        std::lock_guard<std::mutex> g(lock_);
-        auto i = queues_.insert(queue_map::value_type(name, nullptr)).first;
-        if (!i->second)
-            i->second.reset(new queue(name));
-        return i->second.get();
-    }
-
-    // Create a dynamic queue with a unique name.
-    queue* dynamic() {
-        std::ostringstream os;
-        os << "_dynamic_" << next_id_++;
-        return get(os.str());
-    }
-
-  private:
-    typedef std::map<std::string, std::unique_ptr<queue> > queue_map;
-
-    std::mutex lock_;
-    queue_map queues_;
-    std::atomic<int> next_id_; // Use to generate unique queue IDs.
-};
-
-/// Broker connection handler. Things to note:
-///
-/// 1. Each handler manages a single connection.
-///
-/// 2. For a *single connection* calls to proton::handler functions and calls 
to
-/// function objects passed to proton::event_loop::inject() are serialized,
-/// i.e. never called concurrently. Handlers can have per-connection state
-/// without needing locks.
-///
-/// 3. Handler/injected functions for *different connections* can be called
-/// concurrently. Resources used by multiple connections (e.g. the queues in
-/// this example) must be thread-safe.
-///
-/// 4. You can 'inject' work to be done sequentially using a connection's
-/// proton::event_loop. In this example, we create a std::function callback
-/// that we pass to queues, so they can notify us when they have messages.
-///
-class broker_connection_handler : public proton::messaging_handler {
-  public:
-    broker_connection_handler(queues& qs) : queues_(qs) {}
-
-    void on_connection_open(proton::connection& c) OVERRIDE {
-        // Create the has_messages callback for queue subscriptions.
-        //
-        // Make a std::shared_ptr to a thread_safe handle for our 
proton::connection.
-        // The connection's proton::event_loop will remain valid as a 
shared_ptr exists.
-        std::shared_ptr<proton::thread_safe<proton::connection> > ts_c = 
make_shared_thread_safe(c);
-
-        // Make a lambda function to inject a call to this->has_messages() via 
the proton::event_loop.
-        // The function is bound to a shared_ptr so this is safe. If the 
connection has already closed
-        // proton::event_loop::inject() will drop the callback.
-        has_messages_callback_ = [this, ts_c](queue* q) mutable {
-            ts_c->event_loop()->inject(
-                std::bind(&broker_connection_handler::has_messages, this, q));
-        };
-
-        c.open();            // Accept the connection
-    }
-
-    // A sender sends messages from a queue to a subscriber.
-    void on_sender_open(proton::sender &sender) OVERRIDE {
-        queue *q = sender.source().dynamic() ?
-            queues_.dynamic() : queues_.get(sender.source().address());
-        
sender.open(proton::sender_options().source((proton::source_options().address(q->name()))));
-        std::cout << "sending from " << q->name() << std::endl;
-    }
-
-    // We have credit to send a message.
-    void on_sendable(proton::sender &s) OVERRIDE {
-        queue* q = sender_queue(s);
-        if (!do_send(q, s))     // Queue is empty, save ourselves in the 
blocked set.
-            blocked_.insert(std::make_pair(q, s));
-    }
-
-    // A receiver receives messages from a publisher to a queue.
-    void on_receiver_open(proton::receiver &r) OVERRIDE {
-        std::string qname = r.target().address();
-        if (qname == "shutdown") {
-            std::cout << "broker shutting down" << std::endl;
-            // Sending to the special "shutdown" queue stops the broker.
-            r.connection().container().stop(
-                proton::error_condition("shutdown", "stop broker"));
-        } else {
-            std::cout << "receiving to " << qname << std::endl;
-        }
-    }
-
-    // A message is received.
-    void on_message(proton::delivery &d, proton::message &m) OVERRIDE {
-        std::string qname = d.receiver().target().address();
-        queues_.get(qname)->push(m);
-    }
-
-    void on_session_close(proton::session &session) OVERRIDE {
-        // Erase all blocked senders that belong to session.
-        auto predicate = [session](const proton::sender& s) {
-            return s.session() == session;
-        };
-        erase_sender_if(blocked_.begin(), blocked_.end(), predicate);
-    }
-
-    void on_sender_close(proton::sender &sender) OVERRIDE {
-        // Erase sender from the blocked set.
-        auto range = blocked_.equal_range(sender_queue(sender));
-        auto predicate = [sender](const proton::sender& s) { return s == 
sender; };
-        erase_sender_if(range.first, range.second, predicate);
-    }
-
-    void on_error(const proton::error_condition& e) OVERRIDE {
-        std::cerr << "error: " << e.what() << std::endl;
-    }
-    // The container calls on_transport_close() last.
-    void on_transport_close(proton::transport&) OVERRIDE {
-        delete this;            // All done.
-    }
-
-  private:
-    typedef std::multimap<queue*, proton::sender> blocked_map;
-
-    // Get the queue associated with a sender.
-    queue* sender_queue(const proton::sender& s) {
-        return queues_.get(s.source().address()); // Thread safe.
-    }
-
-    // Only called if we have credit. Return true if we sent a message.
-    bool do_send(queue* q, proton::sender &s) {
-        proton::message m;
-        bool popped =  q->pop(m, has_messages_callback_);
-        if (popped)
-            s.send(m);
-        /// if !popped the queue has saved the callback for later.
-        return popped;
-    }
-
-    // Called via the connection's proton::event_loop when q has messages.
-    // Try all the blocked senders.
-    void has_messages(queue* q) {
-        auto range = blocked_.equal_range(q);
-        for (auto i = range.first; i != range.second;) {
-            if (i->second.credit() <= 0 || do_send(q, i->second))
-                i = blocked_.erase(i); // No credit or send was successful, 
stop blocked.
-            else
-                ++i;            // have credit, didn't send, keep blocked
-        }
-    }
-
-    // Use to erase closed senders from blocked_ set.
-    template <class Predicate>
-    void erase_sender_if(blocked_map::iterator begin, blocked_map::iterator 
end, Predicate p) {
-        for (auto i = begin; i != end; ) {
-            if (p(i->second))
-                i = blocked_.erase(i);
-            else
-                ++i;
-        }
-    }
-
-    queues& queues_;
-    blocked_map blocked_;
-    std::function<void(queue*)> has_messages_callback_;
-    proton::connection connection_;
-};
-
-
-class broker {
-  public:
-    broker(const std::string addr) :
-        container_(make_mt_container("mt_broker")), listener_(queues_)
-    {
-        container_->listen(addr, listener_);
-        std::cout << "broker listening on " << addr << std::endl;
-    }
-
-    void run() {
-        std::vector<std::thread> 
threads(std::thread::hardware_concurrency()-1);
-        for (auto& t : threads)
-            t = std::thread(&proton::container::run, container_.get());
-        container_->run();      // Use this thread too.
-        for (auto& t : threads)
-            t.join();
-    }
-
-  private:
-    struct listener : public proton::listen_handler {
-        listener(queues& qs) : queues_(qs) {}
-
-        proton::connection_options on_accept() OVERRIDE{
-            return proton::connection_options().handler(*(new 
broker_connection_handler(queues_)));
-        }
-
-        void on_error(const std::string& s) OVERRIDE {
-            std::cerr << "listen error: " << s << std::endl;
-            throw std::runtime_error(s);
-        }
-        queues& queues_;
-    };
-
-    queues queues_;
-    std::unique_ptr<proton::container> container_;
-    listener listener_;
-};
-
-int main(int argc, char **argv) {
-    // Command line options
-    std::string address("0.0.0.0");
-    example::options opts(argc, argv);
-    opts.add_value(address, 'a', "address", "listen on URL", "URL");
-    try {
-        opts.parse();
-        broker(address).run();
-        return 0;
-    } catch (const example::bad_option& e) {
-        std::cout << opts << std::endl << e.what() << std::endl;
-    } catch (const std::exception& e) {
-        std::cerr << "broker shutdown: " << e.what() << std::endl;
-    }
-    return 1;
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4eba80ee/examples/cpp/mt/epoll_container.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/mt/epoll_container.cpp 
b/examples/cpp/mt/epoll_container.cpp
deleted file mode 100644
index 5643fcc..0000000
--- a/examples/cpp/mt/epoll_container.cpp
+++ /dev/null
@@ -1,541 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "mt_container.hpp"
-
-#include <proton/default_container.hpp>
-#include <proton/event_loop.hpp>
-#include <proton/listen_handler.hpp>
-#include <proton/url.hpp>
-
-#include <proton/io/container_impl_base.hpp>
-#include <proton/io/connection_driver.hpp>
-#include <proton/io/link_namer.hpp>
-
-#include <atomic>
-#include <memory>
-#include <mutex>
-#include <condition_variable>
-#include <thread>
-#include <set>
-#include <sstream>
-#include <system_error>
-
-// Linux native IO
-#include <assert.h>
-#include <errno.h>
-#include <fcntl.h>
-#include <netdb.h>
-#include <sys/epoll.h>
-#include <sys/eventfd.h>
-#include <unistd.h>
-
-#include "../fake_cpp11.hpp"
-
-// Private implementation
-namespace  {
-
-
-using lock_guard = std::lock_guard<std::mutex>;
-
-// Get string from errno
-std::string errno_str(const std::string& msg) {
-    return std::system_error(errno, std::system_category(), msg).what();
-}
-
-// Throw proton::error(errno_str(msg)) if result < 0
-int check(int result, const std::string& msg) {
-    if (result < 0)
-        throw proton::error(errno_str(msg));
-    return result;
-}
-
-// Wrapper for getaddrinfo() that cleans up in destructor.
-class unique_addrinfo {
-  public:
-    unique_addrinfo(const std::string& addr) : addrinfo_(0) {
-        proton::url u(addr);
-        int result = ::getaddrinfo(char_p(u.host()), char_p(u.port()), 0, 
&addrinfo_);
-        if (result)
-            throw proton::error(std::string("bad address: ") + 
gai_strerror(result));
-    }
-    ~unique_addrinfo() { if (addrinfo_) ::freeaddrinfo(addrinfo_); }
-
-    ::addrinfo* operator->() const { return addrinfo_; }
-
-  private:
-    static const char* char_p(const std::string& s) { return s.empty() ? 0 : 
s.c_str(); }
-    ::addrinfo *addrinfo_;
-};
-
-// File descriptor wrapper that calls ::close in destructor.
-class unique_fd {
-  public:
-    unique_fd(int fd) : fd_(fd) {}
-    ~unique_fd() { if (fd_ >= 0) ::close(fd_); }
-    operator int() const { return fd_; }
-    int release() { int ret = fd_; fd_ = -1; return ret; }
-
-  protected:
-    int fd_;
-};
-
-class pollable;
-class pollable_driver;
-class pollable_listener;
-
-class epoll_container : public proton::io::container_impl_base {
-  public:
-    epoll_container(const std::string& id);
-    ~epoll_container();
-
-    // Pull in base class functions here so that name search finds all the 
overloads
-    using standard_container::stop;
-    using standard_container::connect;
-    using standard_container::listen;
-
-    proton::returned<proton::connection> connect(
-        const std::string& addr, const proton::connection_options& opts) 
OVERRIDE;
-
-    proton::listener listen(const std::string& addr, proton::listen_handler&) 
OVERRIDE;
-
-    void stop_listening(const std::string& addr) OVERRIDE;
-
-    void run() OVERRIDE;
-    void auto_stop(bool) OVERRIDE;
-    void stop(const proton::error_condition& err) OVERRIDE;
-
-    std::string id() const OVERRIDE { return id_; }
-
-    // Functions used internally.
-    proton::connection add_driver(proton::connection_options opts, int fd, 
bool server);
-    void erase(pollable*);
-
-    // Link names must be unique per container.
-    // Generate unique names with a simple atomic counter.
-    class atomic_link_namer : public proton::io::link_namer {
-      public:
-        std::string link_name() {
-            std::ostringstream o;
-            o << std::hex << ++count_;
-            return o.str();
-        }
-      private:
-        std::atomic<int> count_;
-    };
-
-     // TODO aconway 2016-06-07: Unfinished
-    void schedule(proton::duration, std::function<void()>) OVERRIDE { throw 
std::logic_error("not implemented"); }
-    void schedule(proton::duration, proton::void_function0&) OVERRIDE { throw 
std::logic_error("not implemented"); }
-    atomic_link_namer link_namer;
-
-  private:
-    template <class T> void store(T& v, const T& x) const { lock_guard 
g(lock_); v = x; }
-
-    void idle_check(const lock_guard&);
-    void interrupt();
-    void wait();
-
-    const std::string id_;
-    const unique_fd epoll_fd_;
-    const unique_fd interrupt_fd_;
-
-    mutable std::mutex lock_;
-
-    proton::connection_options options_;
-    std::map<std::string, std::unique_ptr<pollable_listener> > listeners_;
-    std::map<pollable*, std::unique_ptr<pollable_driver> > drivers_;
-
-    std::condition_variable stopped_;
-    bool stopping_;
-    proton::error_condition stop_err_;
-    std::atomic<size_t> threads_;
-};
-
-// Base class for pollable file-descriptors. Manages epoll interaction,
-// subclasses implement virtual work() to do their serialized work.
-class pollable {
-  public:
-    pollable(int fd, int epoll_fd) : fd_(fd), epoll_fd_(epoll_fd), 
notified_(false), working_(false)
-    {
-        int flags = check(::fcntl(fd, F_GETFL, 0), "non-blocking");
-        check(::fcntl(fd, F_SETFL,  flags | O_NONBLOCK), "non-blocking");
-        ::epoll_event ev = {};
-        ev.data.ptr = this;
-        ::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd_, &ev);
-    }
-
-    virtual ~pollable() {
-        ::epoll_event ev = {};
-        ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd_, &ev); // Ignore errors.
-    }
-
-    bool do_work(uint32_t events) {
-        {
-            lock_guard g(lock_);
-            if (working_)
-                return true;         // Another thread is already working.
-            working_ = true;
-            notified_ = false;
-        }
-        uint32_t new_events = work(events);  // Serialized, outside the lock.
-        if (new_events) {
-            lock_guard g(lock_);
-            rearm(notified_ ?  EPOLLIN|EPOLLOUT : new_events);
-        }
-        return new_events;
-    }
-
-    // Called from any thread to wake up the connection handler.
-    void notify() {
-        lock_guard g(lock_);
-        if (!notified_) {
-            notified_ = true;
-            if (!working_) // No worker thread, rearm now.
-                rearm(EPOLLIN|EPOLLOUT);
-        }
-    }
-
-  protected:
-
-    // Subclass implements  work.
-    // Returns epoll events to re-enable or 0 if finished.
-    virtual uint32_t work(uint32_t events) = 0;
-
-    const unique_fd fd_;
-    const int epoll_fd_;
-
-  private:
-
-    void rearm(uint32_t events) {
-        epoll_event ev;
-        ev.data.ptr = this;
-        ev.events = EPOLLONESHOT | events;
-        check(::epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd_, &ev), "re-arm epoll");
-        working_ = false;
-    }
-
-    std::mutex lock_;
-    bool notified_;
-    bool working_;
-};
-
-class epoll_event_loop : public proton::event_loop {
-  public:
-    typedef std::vector<std::function<void()> > jobs;
-
-    epoll_event_loop(pollable& p) : pollable_(p), closed_(false) {}
-
-    bool inject(std::function<void()> f) OVERRIDE {
-        // Note this is an unbounded work queue.
-        // A resource-safe implementation should be bounded.
-        lock_guard g(lock_);
-        if (closed_)
-            return false;
-        jobs_.push_back(f);
-        pollable_.notify();
-        return true;
-    }
-
-    bool inject(proton::void_function0& f) OVERRIDE {
-        return inject([&f]() { f(); });
-    }
-
-    jobs pop_all() {
-        lock_guard g(lock_);
-        return std::move(jobs_);
-    }
-
-    void close() {
-        lock_guard g(lock_);
-        closed_ = true;
-    }
-
-  private:
-    std::mutex lock_;
-    pollable& pollable_;
-    jobs jobs_;
-    bool closed_;
-};
-
-// Handle epoll wakeups for a connection_driver.
-class pollable_driver : public pollable {
-  public:
-    pollable_driver(epoll_container& c, int fd, int epoll_fd) :
-        pollable(fd, epoll_fd),
-        loop_(new epoll_event_loop(*this)),
-        driver_(c, loop_)
-    {
-        proton::connection conn = driver_.connection();
-        proton::io::set_link_namer(conn, c.link_namer);
-    }
-
-    ~pollable_driver() {
-        loop_->close();                // No calls to notify() after this.
-        driver_.dispatch();            // Run any final events.
-        try { write(); } catch(...) {} // Write connection close if we can.
-        for (auto f : loop_->pop_all()) {// Run final queued work for 
side-effects.
-            try { f(); } catch(...) {}
-        }
-    }
-
-    uint32_t work(uint32_t events) {
-        try {
-            bool can_read = events & EPOLLIN, can_write = events & EPOLLOUT;
-            do {
-                can_write = can_write && write();
-                can_read = can_read && read();
-                for (auto f : loop_->pop_all()) // Run queued work
-                    f();
-                driver_.dispatch();
-            } while (can_read || can_write);
-            return (driver_.read_buffer().size ? EPOLLIN:0) |
-                (driver_.write_buffer().size ? EPOLLOUT:0);
-        } catch (const std::exception& e) {
-            driver_.disconnected(proton::error_condition("exception", 
e.what()));
-        }
-        return 0;               // Ending
-    }
-
-    proton::io::connection_driver& driver() { return driver_; }
-
-  private:
-    static bool try_again(int e) {
-        // These errno values from read or write mean "try again"
-        return (e == EAGAIN || e == EWOULDBLOCK || e == EINTR);
-    }
-
-    bool write() {
-        proton::io::const_buffer wbuf(driver_.write_buffer());
-        if (wbuf.size) {
-            ssize_t n = ::write(fd_, wbuf.data, wbuf.size);
-            if (n > 0) {
-                driver_.write_done(n);
-                return true;
-            } else if (n < 0 && !try_again(errno)) {
-                check(n, "write");
-            }
-        }
-        return false;
-    }
-
-    bool read() {
-        proton::io::mutable_buffer rbuf(driver_.read_buffer());
-        if (rbuf.size) {
-            ssize_t n = ::read(fd_, rbuf.data, rbuf.size);
-            if (n > 0) {
-                driver_.read_done(n);
-                return true;
-            }
-            else if (n == 0)
-                driver_.read_close();
-            else if (!try_again(errno))
-                check(n, "read");
-        }
-        return false;
-    }
-
-    // Lifecycle note: loop_ belongs to the proton::connection, which can live
-    // longer than the driver if the application holds a reference to it, we
-    // disconnect ourselves with loop_->close() in ~connection_driver()
-    epoll_event_loop* loop_;
-    proton::io::connection_driver driver_;
-};
-
-// A pollable listener fd that creates pollable_driver for incoming 
connections.
-class pollable_listener : public pollable {
-  public:
-    pollable_listener(
-        const std::string& addr,
-        proton::listen_handler& l,
-        int epoll_fd,
-        epoll_container& c
-    ) :
-        pollable(socket_listen(addr), epoll_fd),
-        addr_(addr),
-        container_(c),
-        listener_(l)
-    {}
-
-    uint32_t work(uint32_t events) {
-        if (events & EPOLLRDHUP) {
-            try { listener_.on_close(); } catch (...) {}
-            return 0;
-        }
-        try {
-            int accepted = check(::accept(fd_, NULL, 0), "accept");
-            container_.add_driver(listener_.on_accept(), accepted, true);
-            return EPOLLIN;
-        } catch (const std::exception& e) {
-            listener_.on_error(e.what());
-            return 0;
-        }
-    }
-
-    std::string addr() { return addr_; }
-
-  private:
-
-    static int socket_listen(const std::string& addr) {
-        std::string msg = "listen on "+addr;
-        unique_addrinfo ainfo(addr);
-        unique_fd fd(check(::socket(ainfo->ai_family, SOCK_STREAM, 0), msg));
-        int yes = 1;
-        check(::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)), 
msg);
-        check(::bind(fd, ainfo->ai_addr, ainfo->ai_addrlen), msg);
-        check(::listen(fd, 32), msg);
-        return fd.release();
-    }
-
-    std::string addr_;
-    std::function<proton::connection_options(const std::string&)> factory_;
-    epoll_container& container_;
-    proton::connection_options opts_;
-    proton::listen_handler& listener_;
-};
-
-
-epoll_container::epoll_container(const std::string& id)
-    : id_(id),                       epoll_fd_(check(epoll_create(1), 
"epoll_create")),
-      interrupt_fd_(check(eventfd(1, 0), "eventfd")),
-      stopping_(false), threads_(0)
-{}
-
-epoll_container::~epoll_container() {
-    try {
-        stop(proton::error_condition("exception", "container shut-down"));
-        wait();
-    } catch (...) {}
-}
-
-proton::connection epoll_container::add_driver(proton::connection_options 
opts, int fd, bool server)
-{
-    lock_guard g(lock_);
-    if (stopping_)
-        throw proton::error("container is stopping");
-    std::unique_ptr<pollable_driver> eng(new pollable_driver(*this, fd, 
epoll_fd_));
-    if (server)
-        eng->driver().accept(opts);
-    else
-        eng->driver().connect(opts);
-    proton::connection c = eng->driver().connection();
-    eng->notify();
-    drivers_[eng.get()] = std::move(eng);
-    return c;
-}
-
-void epoll_container::erase(pollable* e) {
-    lock_guard g(lock_);
-    if (!drivers_.erase(e)) {
-        pollable_listener* l = dynamic_cast<pollable_listener*>(e);
-        if (l)
-            listeners_.erase(l->addr());
-    }
-    idle_check(g);
-}
-
-void epoll_container::idle_check(const lock_guard&) {
-    if (stopping_  && drivers_.empty() && listeners_.empty())
-        interrupt();
-}
-
-proton::returned<proton::connection> epoll_container::connect(
-    const std::string& addr, const proton::connection_options& opts)
-{
-    std::string msg = "connect to "+addr;
-    unique_addrinfo ainfo(addr);
-    unique_fd fd(check(::socket(ainfo->ai_family, SOCK_STREAM, 0), msg));
-    check(::connect(fd, ainfo->ai_addr, ainfo->ai_addrlen), msg);
-    return make_thread_safe(add_driver(opts, fd.release(), false));
-}
-
-proton::listener epoll_container::listen(const std::string& addr, 
proton::listen_handler& lh) {
-    lock_guard g(lock_);
-    if (stopping_)
-        throw proton::error("container is stopping");
-    auto& l = listeners_[addr];
-    try {
-        l.reset(new pollable_listener(addr, lh, epoll_fd_, *this));
-        l->notify();
-        return proton::listener(*this, addr);
-    } catch (const std::exception& e) {
-        lh.on_error(e.what());
-        lh.on_close();
-        throw;
-    }
-}
-
-void epoll_container::stop_listening(const std::string& addr) {
-    lock_guard g(lock_);
-    listeners_.erase(addr);
-    idle_check(g);
-}
-
-void epoll_container::run() {
-    ++threads_;
-    try {
-        epoll_event e;
-        while(true) {
-            check(::epoll_wait(epoll_fd_, &e, 1, -1), "epoll_wait");
-            pollable* p = reinterpret_cast<pollable*>(e.data.ptr);
-            if (!p)
-                break;          // Interrupted
-            if (!p->do_work(e.events))
-                erase(p);
-        }
-    } catch (const std::exception& e) {
-        stop(proton::error_condition("exception", e.what()));
-    }
-    if (--threads_ == 0)
-        stopped_.notify_all();
-}
-
-void epoll_container::auto_stop(bool set) {
-    lock_guard g(lock_);
-    stopping_ = set;
-}
-
-void epoll_container::stop(const proton::error_condition& err) {
-    lock_guard g(lock_);
-    stop_err_ = err;
-    interrupt();
-}
-
-void epoll_container::wait() {
-    std::unique_lock<std::mutex> l(lock_);
-    stopped_.wait(l, [this]() { return this->threads_ == 0; } );
-    for (auto& eng : drivers_)
-        eng.second->driver().disconnected(stop_err_);
-    listeners_.clear();
-    drivers_.clear();
-}
-
-void epoll_container::interrupt() {
-    // Add an always-readable fd with 0 data and no ONESHOT to interrupt all 
threads.
-    epoll_event ev = {};
-    ev.events = EPOLLIN;
-    check(epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupt_fd_, &ev), 
"interrupt");
-}
-
-}
-
-// This is the only public function.
-std::unique_ptr<proton::container> make_mt_container(const std::string& id) {
-    return std::unique_ptr<proton::container>(new epoll_container(id));
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4eba80ee/examples/cpp/mt/mt_container.hpp
----------------------------------------------------------------------
diff --git a/examples/cpp/mt/mt_container.hpp b/examples/cpp/mt/mt_container.hpp
deleted file mode 100644
index 164fe72..0000000
--- a/examples/cpp/mt/mt_container.hpp
+++ /dev/null
@@ -1,29 +0,0 @@
-#ifndef MT_MT_CONTROLLER_HPP
-#define MT_MT_CONTROLLER_HPP
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include <proton/default_container.hpp>
-#include <memory>
-
-// Defined in whichever MT container implementation we are linked with.
-std::unique_ptr<proton::container> make_mt_container(const std::string& id);
-
-#endif // MT_MT_DEFAULT_CONTAINER.HPP


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to