PROTON-1400: WIP Use the mt broker example as the example instead of the previous st broker - The st broker didn't correctly respect the object access constraints from within handlers
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/4eba80ee Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/4eba80ee Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/4eba80ee Branch: refs/heads/master Commit: 4eba80ee8527ab3145427803ad3bfdf801d79229 Parents: d168b7b Author: Andrew Stitcher <[email protected]> Authored: Tue Jan 24 23:36:03 2017 -0500 Committer: Andrew Stitcher <[email protected]> Committed: Fri Jul 21 12:50:06 2017 -0400 ---------------------------------------------------------------------- examples/cpp/broker.cpp | 352 +++++++++++--------- examples/cpp/broker.hpp | 236 -------------- examples/cpp/mt/broker.cpp | 318 ------------------ examples/cpp/mt/epoll_container.cpp | 541 ------------------------------- examples/cpp/mt/mt_container.hpp | 29 -- 5 files changed, 190 insertions(+), 1286 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4eba80ee/examples/cpp/broker.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/broker.cpp b/examples/cpp/broker.cpp index 97ef206..e47a2a6 100644 --- a/examples/cpp/broker.cpp +++ b/examples/cpp/broker.cpp @@ -1,5 +1,4 @@ /* - * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -16,271 +15,300 @@ * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. - * */ #include "options.hpp" #include <proton/connection.hpp> +#include <proton/connection_options.hpp> #include <proton/container.hpp> #include <proton/default_container.hpp> #include <proton/delivery.hpp> #include <proton/error_condition.hpp> +#include <proton/listen_handler.hpp> #include <proton/listener.hpp> -#include <proton/messaging_handler.hpp> #include <proton/message.hpp> -#include <proton/receiver_options.hpp> -#include <proton/sender.hpp> +#include <proton/messaging_handler.hpp> #include <proton/sender_options.hpp> #include <proton/source_options.hpp> -#include <proton/target_options.hpp> +#include <proton/target.hpp> +#include <proton/thread_safe.hpp> #include <proton/tracker.hpp> -#include <proton/transport.hpp> -#include <proton/url.hpp> +#include <atomic> #include <deque> +#include <functional> #include <iostream> -#include <list> #include <map> +#include <mutex> #include <string> #include "fake_cpp11.hpp" -/// A simple implementation of a queue. +// Thread safe queue. +// Stores messages, notifies subscribed connections when there is data. class queue { public: - queue(const std::string &name, bool dynamic = false) : name_(name), dynamic_(dynamic) {} + queue(const std::string& name) : name_(name) {} std::string name() const { return name_; } - void subscribe(proton::sender s) { - consumers_.push_back(s); - } - - // Return true if queue can be deleted. - bool unsubscribe(proton::sender s) { - consumers_.remove(s); - return (consumers_.size() == 0 && (dynamic_ || messages_.size() == 0)); - } - - void publish(const proton::message &m) { + // Push a message onto the queue. + // If the queue was previously empty, notify subscribers it has messages. + // Called from receiver's connection. + void push(const proton::message &m) { + std::lock_guard<std::mutex> g(lock_); messages_.push_back(m); - dispatch(0); - } - - void dispatch(proton::sender *s) { - while (deliver_to(s)) {} - } - - bool deliver_to(proton::sender *s) { - // Deliver to single sender if supplied, else all consumers - int count = s ? 1 : consumers_.size(); - - if (!count) return false; - - bool result = false; - sender_list::iterator it = consumers_.begin(); - - if (!s && count) { - s = &*it; + if (messages_.size() == 1) { // Non-empty, notify subscribers + for (auto cb : callbacks_) + cb(this); + callbacks_.clear(); } + } - while (messages_.size()) { - if (s->credit()) { - const proton::message& m = messages_.front(); - - s->send(m); - messages_.pop_front(); - result = true; - } - - if (--count) { - it++; - } else { - return result; - } + // If the queue is not empty, pop a message into m and return true. + // Otherwise save callback to be called when there are messages and return false. + // Called from sender's connection. + bool pop(proton::message& m, std::function<void(queue*)> callback) { + std::lock_guard<std::mutex> g(lock_); + if (messages_.empty()) { + callbacks_.push_back(callback); + return false; + } else { + m = std::move(messages_.front()); + messages_.pop_front(); + return true; } - - return false; } private: - typedef std::deque<proton::message> message_queue; - typedef std::list<proton::sender> sender_list; - - std::string name_; - bool dynamic_; - message_queue messages_; - sender_list consumers_; + const std::string name_; + std::mutex lock_; + std::deque<proton::message> messages_; + std::vector<std::function<void(queue*)> > callbacks_; }; -/// A collection of queues and queue factory, used by a broker. +/// Thread safe map of queues. class queues { public: queues() : next_id_(0) {} - virtual ~queues() {} - // Get or create a queue. - virtual queue &get(const std::string &address) { - if (address.empty()) { - throw std::runtime_error("empty queue name"); - } - - queue*& q = queues_[address]; - - if (!q) q = new queue(address); - - return *q; + // Get or create the named queue. + queue* get(const std::string& name) { + std::lock_guard<std::mutex> g(lock_); + auto i = queues_.insert(queue_map::value_type(name, nullptr)).first; + if (!i->second) + i->second.reset(new queue(name)); + return i->second.get(); } // Create a dynamic queue with a unique name. - virtual queue &dynamic() { + queue* dynamic() { std::ostringstream os; - os << "q" << next_id_++; - queue *q = queues_[os.str()] = new queue(os.str(), true); - - return *q; + os << "_dynamic_" << next_id_++; + return get(os.str()); } - // Delete the named queue - virtual void erase(std::string &name) { - delete queues_[name]; - queues_.erase(name); - } + private: + typedef std::map<std::string, std::unique_ptr<queue> > queue_map; - protected: - typedef std::map<std::string, queue *> queue_map; + std::mutex lock_; queue_map queues_; - int next_id_; // Use to generate unique queue IDs. + std::atomic<int> next_id_; // Use to generate unique queue IDs. }; -// A handler to implement broker logic -class broker_handler : public proton::messaging_handler { +/// Broker connection handler. Things to note: +/// +/// 1. Each handler manages a single connection. +/// +/// 2. For a *single connection* calls to proton::handler functions and calls to +/// function objects passed to proton::event_loop::inject() are serialized, +/// i.e. never called concurrently. Handlers can have per-connection state +/// without needing locks. +/// +/// 3. Handler/injected functions for *different connections* can be called +/// concurrently. Resources used by multiple connections (e.g. the queues in +/// this example) must be thread-safe. +/// +/// 4. You can 'inject' work to be done sequentially using a connection's +/// proton::event_loop. In this example, we create a std::function callback +/// that we pass to queues, so they can notify us when they have messages. +/// +class broker_connection_handler : public proton::messaging_handler { public: - broker_handler(queues& qs) : queues_(qs) {} + broker_connection_handler(queues& qs) : queues_(qs) {} + + void on_connection_open(proton::connection& c) OVERRIDE { + // Create the has_messages callback for queue subscriptions. + // + // Make a std::shared_ptr to a thread_safe handle for our proton::connection. + // The connection's proton::event_loop will remain valid as a shared_ptr exists. + std::shared_ptr<proton::thread_safe<proton::connection> > ts_c = make_shared_thread_safe(c); + + // Make a lambda function to inject a call to this->has_messages() via the proton::event_loop. + // The function is bound to a shared_ptr so this is safe. If the connection has already closed + // proton::event_loop::inject() will drop the callback. + has_messages_callback_ = [this, ts_c](queue* q) mutable { + ts_c->event_loop().inject( + std::bind(&broker_connection_handler::has_messages, this, q)); + }; + + c.open(); // Accept the connection + } + // A sender sends messages from a queue to a subscriber. void on_sender_open(proton::sender &sender) OVERRIDE { - proton::source src(sender.source()); - queue *q; - if (src.dynamic()) { - q = &queues_.dynamic(); - } else if (!src.address().empty()) { - q = &queues_.get(src.address()); - } else { - sender.close(proton::error_condition("No queue address supplied")); - return; - } - sender.open(proton::sender_options().source(proton::source_options().address(q->name()))); - q->subscribe(sender); - std::cout << "broker outgoing link from " << q->name() << std::endl; + queue *q = sender.source().dynamic() ? + queues_.dynamic() : queues_.get(sender.source().address()); + sender.open(proton::sender_options().source((proton::source_options().address(q->name())))); + std::cout << "sending from " << q->name() << std::endl; + } + + // We have credit to send a message. + void on_sendable(proton::sender &s) OVERRIDE { + queue* q = sender_queue(s); + if (!do_send(q, s)) // Queue is empty, save ourselves in the blocked set. + blocked_.insert(std::make_pair(q, s)); } - void on_receiver_open(proton::receiver &receiver) OVERRIDE { - std::string address = receiver.target().address(); - if (!address.empty()) { - receiver.open(proton::receiver_options().target(proton::target_options().address(address))); - std::cout << "broker incoming link to " << address << std::endl; + // A receiver receives messages from a publisher to a queue. + void on_receiver_open(proton::receiver &r) OVERRIDE { + std::string qname = r.target().address(); + if (qname == "shutdown") { + std::cout << "broker shutting down" << std::endl; + // Sending to the special "shutdown" queue stops the broker. + r.connection().container().stop( + proton::error_condition("shutdown", "stop broker")); } else { - receiver.close(proton::error_condition("No queue address supplied")); + std::cout << "receiving to " << qname << std::endl; } } - void unsubscribe(proton::sender lnk) { - std::string address = lnk.source().address(); + // A message is received. + void on_message(proton::delivery &d, proton::message &m) OVERRIDE { + std::string qname = d.receiver().target().address(); + queues_.get(qname)->push(m); + } - if (queues_.get(address).unsubscribe(lnk)) { - queues_.erase(address); - } + void on_session_close(proton::session &session) OVERRIDE { + // Erase all blocked senders that belong to session. + auto predicate = [session](const proton::sender& s) { + return s.session() == session; + }; + erase_sender_if(blocked_.begin(), blocked_.end(), predicate); } void on_sender_close(proton::sender &sender) OVERRIDE { - unsubscribe(sender); + // Erase sender from the blocked set. + auto range = blocked_.equal_range(sender_queue(sender)); + auto predicate = [sender](const proton::sender& s) { return s == sender; }; + erase_sender_if(range.first, range.second, predicate); } - void on_connection_close(proton::connection &c) OVERRIDE { - remove_stale_consumers(c); + void on_error(const proton::error_condition& e) OVERRIDE { + std::cerr << "error: " << e.what() << std::endl; } - - void on_transport_close(proton::transport &t) OVERRIDE { - remove_stale_consumers(t.connection()); + // The container calls on_transport_close() last. + void on_transport_close(proton::transport&) OVERRIDE { + delete this; // All done. } - void on_transport_error(proton::transport &t) OVERRIDE { - std::cout << "broker client disconnect: " << t.error().what() << std::endl; - } + private: + typedef std::multimap<queue*, proton::sender> blocked_map; - void on_error(const proton::error_condition &c) OVERRIDE { - std::cerr << "broker error: " << c.what() << std::endl; + // Get the queue associated with a sender. + queue* sender_queue(const proton::sender& s) { + return queues_.get(s.source().address()); // Thread safe. } - void remove_stale_consumers(proton::connection connection) { - proton::sender_range r = connection.senders(); - for (proton::sender_iterator i = r.begin(); i != r.end(); ++i) { - if (i->active()) - unsubscribe(*i); - } + // Only called if we have credit. Return true if we sent a message. + bool do_send(queue* q, proton::sender &s) { + proton::message m; + bool popped = q->pop(m, has_messages_callback_); + if (popped) + s.send(m); + /// if !popped the queue has saved the callback for later. + return popped; } - void on_sendable(proton::sender &s) OVERRIDE { - std::string address = s.source().address(); - - queues_.get(address).dispatch(&s); + // Called via the connection's proton::event_loop when q has messages. + // Try all the blocked senders. + void has_messages(queue* q) { + auto range = blocked_.equal_range(q); + for (auto i = range.first; i != range.second;) { + if (i->second.credit() <= 0 || do_send(q, i->second)) + i = blocked_.erase(i); // No credit or send was successful, stop blocked. + else + ++i; // have credit, didn't send, keep blocked + } } - void on_message(proton::delivery &d, proton::message &m) OVERRIDE { - std::string address = d.receiver().target().address(); - queues_.get(address).publish(m); + // Use to erase closed senders from blocked_ set. + template <class Predicate> + void erase_sender_if(blocked_map::iterator begin, blocked_map::iterator end, Predicate p) { + for (auto i = begin; i != end; ) { + if (p(i->second)) + i = blocked_.erase(i); + else + ++i; + } } - protected: queues& queues_; + blocked_map blocked_; + std::function<void(queue*)> has_messages_callback_; + proton::connection connection_; }; -// The broker class broker { public: - broker(const std::string& url) : handler_(url, queues_) {} + broker(const std::string addr) : + container_("mt_broker"), listener_(queues_) + { + container_.listen(addr, listener_); + std::cout << "broker listening on " << addr << std::endl; + } - proton::messaging_handler& handler() { return handler_; } + void run() { + container_.run(/* std::thread::hardware_concurrency() */); + } private: - class my_handler : public broker_handler { - public: - my_handler(const std::string& u, queues& qs) : broker_handler(qs), url_(u) {} + struct listener : public proton::listen_handler { + listener(queues& qs) : queues_(qs) {} - void on_container_start(proton::container &c) OVERRIDE { - c.listen(url_); - std::cout << "broker listening on " << url_ << std::endl; + proton::connection_options on_accept(proton::listener&) OVERRIDE{ + return proton::connection_options().handler(*(new broker_connection_handler(queues_))); } - private: - const std::string& url_; + void on_error(proton::listener&, const std::string& s) OVERRIDE { + std::cerr << "listen error: " << s << std::endl; + throw std::runtime_error(s); + } + queues& queues_; }; - private: queues queues_; - my_handler handler_; + proton::container container_; + listener listener_; }; int main(int argc, char **argv) { - std::string url("0.0.0.0"); + // Command line options + std::string address("0.0.0.0"); example::options opts(argc, argv); - opts.add_value(url, 'a', "address", "listen on URL", "URL"); + opts.add_value(address, 'a', "address", "listen on URL", "URL"); try { opts.parse(); - - broker b(url); - proton::default_container(b.handler()).run(); - + broker(address).run(); return 0; } catch (const example::bad_option& e) { std::cout << opts << std::endl << e.what() << std::endl; } catch (const std::exception& e) { - std::cerr << e.what() << std::endl; + std::cerr << "broker shutdown: " << e.what() << std::endl; } - return 1; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4eba80ee/examples/cpp/broker.hpp ---------------------------------------------------------------------- diff --git a/examples/cpp/broker.hpp b/examples/cpp/broker.hpp deleted file mode 100644 index 953713f..0000000 --- a/examples/cpp/broker.hpp +++ /dev/null @@ -1,236 +0,0 @@ -#ifndef BROKER_HPP -#define BROKER_HPP - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/// @file -/// -/// Common code used by different broker examples. -/// -/// The examples add functionality as needed, this helps to make it -/// easier to see the important differences between the examples. - -#include <proton/connection.hpp> -#include <proton/delivery.hpp> -#include <proton/messaging_handler.hpp> -#include <proton/message.hpp> -#include <proton/sasl.hpp> -#include <proton/sender.hpp> -#include <proton/tracker.hpp> -#include <proton/transport.hpp> -#include <proton/sender_options.hpp> -#include <proton/receiver_options.hpp> -#include <proton/source_options.hpp> -#include <proton/target_options.hpp> - -#include <iostream> -#include <deque> -#include <map> -#include <list> -#include <sstream> - -/// A simple implementation of a queue. -class queue { - public: - queue(const std::string &name, bool dynamic = false) : name_(name), dynamic_(dynamic) {} - - std::string name() const { return name_; } - - void subscribe(proton::sender s) { - consumers_.push_back(s); - } - - // Return true if queue can be deleted. - bool unsubscribe(proton::sender s) { - consumers_.remove(s); - return (consumers_.size() == 0 && (dynamic_ || messages_.size() == 0)); - } - - void publish(const proton::message &m) { - messages_.push_back(m); - dispatch(0); - } - - void dispatch(proton::sender *s) { - while (deliver_to(s)) {} - } - - bool deliver_to(proton::sender *s) { - // Deliver to single sender if supplied, else all consumers - int count = s ? 1 : consumers_.size(); - - if (!count) return false; - - bool result = false; - sender_list::iterator it = consumers_.begin(); - - if (!s && count) { - s = &*it; - } - - while (messages_.size()) { - if (s->credit()) { - const proton::message& m = messages_.front(); - - s->send(m); - messages_.pop_front(); - result = true; - } - - if (--count) { - it++; - } else { - return result; - } - } - - return false; - } - - private: - typedef std::deque<proton::message> message_queue; - typedef std::list<proton::sender> sender_list; - - std::string name_; - bool dynamic_; - message_queue messages_; - sender_list consumers_; -}; - -/// A collection of queues and queue factory, used by a broker. -class queues { - public: - queues() : next_id_(0) {} - virtual ~queues() {} - - // Get or create a queue. - virtual queue &get(const std::string &address = std::string()) { - if (address.empty()) { - throw std::runtime_error("empty queue name"); - } - - queue*& q = queues_[address]; - - if (!q) q = new queue(address); - - return *q; - } - - // Create a dynamic queue with a unique name. - virtual queue &dynamic() { - std::ostringstream os; - os << "q" << next_id_++; - queue *q = queues_[os.str()] = new queue(os.str(), true); - - return *q; - } - - // Delete the named queue - virtual void erase(std::string &name) { - delete queues_[name]; - queues_.erase(name); - } - - protected: - typedef std::map<std::string, queue *> queue_map; - queue_map queues_; - int next_id_; // Use to generate unique queue IDs. -}; - -#include <proton/config.hpp> - -/** Common handler logic for brokers. */ -class broker_handler : public proton::messaging_handler { - public: - broker_handler(queues& qs) : queues_(qs) {} - - void on_transport_open(proton::transport &t) OVERRIDE { - std::cout << "Connection from user: " << t.sasl().user() << " (mechanism: " << t.sasl().mech() << ")" << std::endl; - } - - void on_sender_open(proton::sender &sender) OVERRIDE { - proton::source src(sender.source()); - queue &q = src.dynamic() ? - queues_.dynamic() : queues_.get(src.address()); - sender.open(proton::sender_options().source(proton::source_options().address(q.name()))); - q.subscribe(sender); - std::cout << "broker outgoing link from " << q.name() << std::endl; - } - - void on_receiver_open(proton::receiver &receiver) OVERRIDE { - std::string address = receiver.target().address(); - if (!address.empty()) { - receiver.open(proton::receiver_options().target(proton::target_options().address(address))); - std::cout << "broker incoming link to " << address << std::endl; - } - } - - void unsubscribe(proton::sender lnk) { - std::string address = lnk.source().address(); - - if (queues_.get(address).unsubscribe(lnk)) { - queues_.erase(address); - } - } - - void on_sender_close(proton::sender &sender) OVERRIDE { - unsubscribe(sender); - } - - void on_connection_close(proton::connection &c) OVERRIDE { - remove_stale_consumers(c); - } - - void on_transport_close(proton::transport &t) OVERRIDE { - remove_stale_consumers(t.connection()); - } - - void on_transport_error(proton::transport &t) OVERRIDE { - std::cout << "broker client disconnect: " << t.error().what() << std::endl; - } - - void on_error(const proton::error_condition &c) OVERRIDE { - std::cerr << "broker error: " << c.what() << std::endl; - } - - void remove_stale_consumers(proton::connection connection) { - proton::sender_range sr = connection.senders(); - for (proton::sender_iterator i = sr.begin(); i != sr.end(); ++i) { - if (i->active()) - unsubscribe(*i); - } - } - - void on_sendable(proton::sender &s) OVERRIDE { - std::string address = s.source().address(); - - queues_.get(address).dispatch(&s); - } - - void on_message(proton::delivery &d, proton::message &m) OVERRIDE { - std::string address = d.receiver().target().address(); - queues_.get(address).publish(m); - } - - protected: - queues& queues_; -}; - -#endif // BROKER_HPP http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4eba80ee/examples/cpp/mt/broker.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/mt/broker.cpp b/examples/cpp/mt/broker.cpp deleted file mode 100644 index 83b7005..0000000 --- a/examples/cpp/mt/broker.cpp +++ /dev/null @@ -1,318 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "../options.hpp" -#include "mt_container.hpp" - -#include <proton/connection.hpp> -#include <proton/connection_options.hpp> -#include <proton/container.hpp> -#include <proton/default_container.hpp> -#include <proton/delivery.hpp> -#include <proton/error_condition.hpp> -#include <proton/listen_handler.hpp> -#include <proton/listener.hpp> -#include <proton/message.hpp> -#include <proton/messaging_handler.hpp> -#include <proton/sender_options.hpp> -#include <proton/source_options.hpp> -#include <proton/target.hpp> -#include <proton/thread_safe.hpp> -#include <proton/tracker.hpp> - -#include <atomic> -#include <deque> -#include <functional> -#include <iostream> -#include <map> -#include <mutex> -#include <thread> - -#include "../fake_cpp11.hpp" - -// Thread safe queue. -// Stores messages, notifies subscribed connections when there is data. -class queue { - public: - queue(const std::string& name) : name_(name) {} - - std::string name() const { return name_; } - - // Push a message onto the queue. - // If the queue was previously empty, notify subscribers it has messages. - // Called from receiver's connection. - void push(const proton::message &m) { - std::lock_guard<std::mutex> g(lock_); - messages_.push_back(m); - if (messages_.size() == 1) { // Non-empty, notify subscribers - for (auto cb : callbacks_) - cb(this); - callbacks_.clear(); - } - } - - // If the queue is not empty, pop a message into m and return true. - // Otherwise save callback to be called when there are messages and return false. - // Called from sender's connection. - bool pop(proton::message& m, std::function<void(queue*)> callback) { - std::lock_guard<std::mutex> g(lock_); - if (messages_.empty()) { - callbacks_.push_back(callback); - return false; - } else { - m = std::move(messages_.front()); - messages_.pop_front(); - return true; - } - } - - private: - const std::string name_; - std::mutex lock_; - std::deque<proton::message> messages_; - std::vector<std::function<void(queue*)> > callbacks_; -}; - -/// Thread safe map of queues. -class queues { - public: - queues() : next_id_(0) {} - - // Get or create the named queue. - queue* get(const std::string& name) { - std::lock_guard<std::mutex> g(lock_); - auto i = queues_.insert(queue_map::value_type(name, nullptr)).first; - if (!i->second) - i->second.reset(new queue(name)); - return i->second.get(); - } - - // Create a dynamic queue with a unique name. - queue* dynamic() { - std::ostringstream os; - os << "_dynamic_" << next_id_++; - return get(os.str()); - } - - private: - typedef std::map<std::string, std::unique_ptr<queue> > queue_map; - - std::mutex lock_; - queue_map queues_; - std::atomic<int> next_id_; // Use to generate unique queue IDs. -}; - -/// Broker connection handler. Things to note: -/// -/// 1. Each handler manages a single connection. -/// -/// 2. For a *single connection* calls to proton::handler functions and calls to -/// function objects passed to proton::event_loop::inject() are serialized, -/// i.e. never called concurrently. Handlers can have per-connection state -/// without needing locks. -/// -/// 3. Handler/injected functions for *different connections* can be called -/// concurrently. Resources used by multiple connections (e.g. the queues in -/// this example) must be thread-safe. -/// -/// 4. You can 'inject' work to be done sequentially using a connection's -/// proton::event_loop. In this example, we create a std::function callback -/// that we pass to queues, so they can notify us when they have messages. -/// -class broker_connection_handler : public proton::messaging_handler { - public: - broker_connection_handler(queues& qs) : queues_(qs) {} - - void on_connection_open(proton::connection& c) OVERRIDE { - // Create the has_messages callback for queue subscriptions. - // - // Make a std::shared_ptr to a thread_safe handle for our proton::connection. - // The connection's proton::event_loop will remain valid as a shared_ptr exists. - std::shared_ptr<proton::thread_safe<proton::connection> > ts_c = make_shared_thread_safe(c); - - // Make a lambda function to inject a call to this->has_messages() via the proton::event_loop. - // The function is bound to a shared_ptr so this is safe. If the connection has already closed - // proton::event_loop::inject() will drop the callback. - has_messages_callback_ = [this, ts_c](queue* q) mutable { - ts_c->event_loop()->inject( - std::bind(&broker_connection_handler::has_messages, this, q)); - }; - - c.open(); // Accept the connection - } - - // A sender sends messages from a queue to a subscriber. - void on_sender_open(proton::sender &sender) OVERRIDE { - queue *q = sender.source().dynamic() ? - queues_.dynamic() : queues_.get(sender.source().address()); - sender.open(proton::sender_options().source((proton::source_options().address(q->name())))); - std::cout << "sending from " << q->name() << std::endl; - } - - // We have credit to send a message. - void on_sendable(proton::sender &s) OVERRIDE { - queue* q = sender_queue(s); - if (!do_send(q, s)) // Queue is empty, save ourselves in the blocked set. - blocked_.insert(std::make_pair(q, s)); - } - - // A receiver receives messages from a publisher to a queue. - void on_receiver_open(proton::receiver &r) OVERRIDE { - std::string qname = r.target().address(); - if (qname == "shutdown") { - std::cout << "broker shutting down" << std::endl; - // Sending to the special "shutdown" queue stops the broker. - r.connection().container().stop( - proton::error_condition("shutdown", "stop broker")); - } else { - std::cout << "receiving to " << qname << std::endl; - } - } - - // A message is received. - void on_message(proton::delivery &d, proton::message &m) OVERRIDE { - std::string qname = d.receiver().target().address(); - queues_.get(qname)->push(m); - } - - void on_session_close(proton::session &session) OVERRIDE { - // Erase all blocked senders that belong to session. - auto predicate = [session](const proton::sender& s) { - return s.session() == session; - }; - erase_sender_if(blocked_.begin(), blocked_.end(), predicate); - } - - void on_sender_close(proton::sender &sender) OVERRIDE { - // Erase sender from the blocked set. - auto range = blocked_.equal_range(sender_queue(sender)); - auto predicate = [sender](const proton::sender& s) { return s == sender; }; - erase_sender_if(range.first, range.second, predicate); - } - - void on_error(const proton::error_condition& e) OVERRIDE { - std::cerr << "error: " << e.what() << std::endl; - } - // The container calls on_transport_close() last. - void on_transport_close(proton::transport&) OVERRIDE { - delete this; // All done. - } - - private: - typedef std::multimap<queue*, proton::sender> blocked_map; - - // Get the queue associated with a sender. - queue* sender_queue(const proton::sender& s) { - return queues_.get(s.source().address()); // Thread safe. - } - - // Only called if we have credit. Return true if we sent a message. - bool do_send(queue* q, proton::sender &s) { - proton::message m; - bool popped = q->pop(m, has_messages_callback_); - if (popped) - s.send(m); - /// if !popped the queue has saved the callback for later. - return popped; - } - - // Called via the connection's proton::event_loop when q has messages. - // Try all the blocked senders. - void has_messages(queue* q) { - auto range = blocked_.equal_range(q); - for (auto i = range.first; i != range.second;) { - if (i->second.credit() <= 0 || do_send(q, i->second)) - i = blocked_.erase(i); // No credit or send was successful, stop blocked. - else - ++i; // have credit, didn't send, keep blocked - } - } - - // Use to erase closed senders from blocked_ set. - template <class Predicate> - void erase_sender_if(blocked_map::iterator begin, blocked_map::iterator end, Predicate p) { - for (auto i = begin; i != end; ) { - if (p(i->second)) - i = blocked_.erase(i); - else - ++i; - } - } - - queues& queues_; - blocked_map blocked_; - std::function<void(queue*)> has_messages_callback_; - proton::connection connection_; -}; - - -class broker { - public: - broker(const std::string addr) : - container_(make_mt_container("mt_broker")), listener_(queues_) - { - container_->listen(addr, listener_); - std::cout << "broker listening on " << addr << std::endl; - } - - void run() { - std::vector<std::thread> threads(std::thread::hardware_concurrency()-1); - for (auto& t : threads) - t = std::thread(&proton::container::run, container_.get()); - container_->run(); // Use this thread too. - for (auto& t : threads) - t.join(); - } - - private: - struct listener : public proton::listen_handler { - listener(queues& qs) : queues_(qs) {} - - proton::connection_options on_accept() OVERRIDE{ - return proton::connection_options().handler(*(new broker_connection_handler(queues_))); - } - - void on_error(const std::string& s) OVERRIDE { - std::cerr << "listen error: " << s << std::endl; - throw std::runtime_error(s); - } - queues& queues_; - }; - - queues queues_; - std::unique_ptr<proton::container> container_; - listener listener_; -}; - -int main(int argc, char **argv) { - // Command line options - std::string address("0.0.0.0"); - example::options opts(argc, argv); - opts.add_value(address, 'a', "address", "listen on URL", "URL"); - try { - opts.parse(); - broker(address).run(); - return 0; - } catch (const example::bad_option& e) { - std::cout << opts << std::endl << e.what() << std::endl; - } catch (const std::exception& e) { - std::cerr << "broker shutdown: " << e.what() << std::endl; - } - return 1; -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4eba80ee/examples/cpp/mt/epoll_container.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/mt/epoll_container.cpp b/examples/cpp/mt/epoll_container.cpp deleted file mode 100644 index 5643fcc..0000000 --- a/examples/cpp/mt/epoll_container.cpp +++ /dev/null @@ -1,541 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "mt_container.hpp" - -#include <proton/default_container.hpp> -#include <proton/event_loop.hpp> -#include <proton/listen_handler.hpp> -#include <proton/url.hpp> - -#include <proton/io/container_impl_base.hpp> -#include <proton/io/connection_driver.hpp> -#include <proton/io/link_namer.hpp> - -#include <atomic> -#include <memory> -#include <mutex> -#include <condition_variable> -#include <thread> -#include <set> -#include <sstream> -#include <system_error> - -// Linux native IO -#include <assert.h> -#include <errno.h> -#include <fcntl.h> -#include <netdb.h> -#include <sys/epoll.h> -#include <sys/eventfd.h> -#include <unistd.h> - -#include "../fake_cpp11.hpp" - -// Private implementation -namespace { - - -using lock_guard = std::lock_guard<std::mutex>; - -// Get string from errno -std::string errno_str(const std::string& msg) { - return std::system_error(errno, std::system_category(), msg).what(); -} - -// Throw proton::error(errno_str(msg)) if result < 0 -int check(int result, const std::string& msg) { - if (result < 0) - throw proton::error(errno_str(msg)); - return result; -} - -// Wrapper for getaddrinfo() that cleans up in destructor. -class unique_addrinfo { - public: - unique_addrinfo(const std::string& addr) : addrinfo_(0) { - proton::url u(addr); - int result = ::getaddrinfo(char_p(u.host()), char_p(u.port()), 0, &addrinfo_); - if (result) - throw proton::error(std::string("bad address: ") + gai_strerror(result)); - } - ~unique_addrinfo() { if (addrinfo_) ::freeaddrinfo(addrinfo_); } - - ::addrinfo* operator->() const { return addrinfo_; } - - private: - static const char* char_p(const std::string& s) { return s.empty() ? 0 : s.c_str(); } - ::addrinfo *addrinfo_; -}; - -// File descriptor wrapper that calls ::close in destructor. -class unique_fd { - public: - unique_fd(int fd) : fd_(fd) {} - ~unique_fd() { if (fd_ >= 0) ::close(fd_); } - operator int() const { return fd_; } - int release() { int ret = fd_; fd_ = -1; return ret; } - - protected: - int fd_; -}; - -class pollable; -class pollable_driver; -class pollable_listener; - -class epoll_container : public proton::io::container_impl_base { - public: - epoll_container(const std::string& id); - ~epoll_container(); - - // Pull in base class functions here so that name search finds all the overloads - using standard_container::stop; - using standard_container::connect; - using standard_container::listen; - - proton::returned<proton::connection> connect( - const std::string& addr, const proton::connection_options& opts) OVERRIDE; - - proton::listener listen(const std::string& addr, proton::listen_handler&) OVERRIDE; - - void stop_listening(const std::string& addr) OVERRIDE; - - void run() OVERRIDE; - void auto_stop(bool) OVERRIDE; - void stop(const proton::error_condition& err) OVERRIDE; - - std::string id() const OVERRIDE { return id_; } - - // Functions used internally. - proton::connection add_driver(proton::connection_options opts, int fd, bool server); - void erase(pollable*); - - // Link names must be unique per container. - // Generate unique names with a simple atomic counter. - class atomic_link_namer : public proton::io::link_namer { - public: - std::string link_name() { - std::ostringstream o; - o << std::hex << ++count_; - return o.str(); - } - private: - std::atomic<int> count_; - }; - - // TODO aconway 2016-06-07: Unfinished - void schedule(proton::duration, std::function<void()>) OVERRIDE { throw std::logic_error("not implemented"); } - void schedule(proton::duration, proton::void_function0&) OVERRIDE { throw std::logic_error("not implemented"); } - atomic_link_namer link_namer; - - private: - template <class T> void store(T& v, const T& x) const { lock_guard g(lock_); v = x; } - - void idle_check(const lock_guard&); - void interrupt(); - void wait(); - - const std::string id_; - const unique_fd epoll_fd_; - const unique_fd interrupt_fd_; - - mutable std::mutex lock_; - - proton::connection_options options_; - std::map<std::string, std::unique_ptr<pollable_listener> > listeners_; - std::map<pollable*, std::unique_ptr<pollable_driver> > drivers_; - - std::condition_variable stopped_; - bool stopping_; - proton::error_condition stop_err_; - std::atomic<size_t> threads_; -}; - -// Base class for pollable file-descriptors. Manages epoll interaction, -// subclasses implement virtual work() to do their serialized work. -class pollable { - public: - pollable(int fd, int epoll_fd) : fd_(fd), epoll_fd_(epoll_fd), notified_(false), working_(false) - { - int flags = check(::fcntl(fd, F_GETFL, 0), "non-blocking"); - check(::fcntl(fd, F_SETFL, flags | O_NONBLOCK), "non-blocking"); - ::epoll_event ev = {}; - ev.data.ptr = this; - ::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd_, &ev); - } - - virtual ~pollable() { - ::epoll_event ev = {}; - ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd_, &ev); // Ignore errors. - } - - bool do_work(uint32_t events) { - { - lock_guard g(lock_); - if (working_) - return true; // Another thread is already working. - working_ = true; - notified_ = false; - } - uint32_t new_events = work(events); // Serialized, outside the lock. - if (new_events) { - lock_guard g(lock_); - rearm(notified_ ? EPOLLIN|EPOLLOUT : new_events); - } - return new_events; - } - - // Called from any thread to wake up the connection handler. - void notify() { - lock_guard g(lock_); - if (!notified_) { - notified_ = true; - if (!working_) // No worker thread, rearm now. - rearm(EPOLLIN|EPOLLOUT); - } - } - - protected: - - // Subclass implements work. - // Returns epoll events to re-enable or 0 if finished. - virtual uint32_t work(uint32_t events) = 0; - - const unique_fd fd_; - const int epoll_fd_; - - private: - - void rearm(uint32_t events) { - epoll_event ev; - ev.data.ptr = this; - ev.events = EPOLLONESHOT | events; - check(::epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd_, &ev), "re-arm epoll"); - working_ = false; - } - - std::mutex lock_; - bool notified_; - bool working_; -}; - -class epoll_event_loop : public proton::event_loop { - public: - typedef std::vector<std::function<void()> > jobs; - - epoll_event_loop(pollable& p) : pollable_(p), closed_(false) {} - - bool inject(std::function<void()> f) OVERRIDE { - // Note this is an unbounded work queue. - // A resource-safe implementation should be bounded. - lock_guard g(lock_); - if (closed_) - return false; - jobs_.push_back(f); - pollable_.notify(); - return true; - } - - bool inject(proton::void_function0& f) OVERRIDE { - return inject([&f]() { f(); }); - } - - jobs pop_all() { - lock_guard g(lock_); - return std::move(jobs_); - } - - void close() { - lock_guard g(lock_); - closed_ = true; - } - - private: - std::mutex lock_; - pollable& pollable_; - jobs jobs_; - bool closed_; -}; - -// Handle epoll wakeups for a connection_driver. -class pollable_driver : public pollable { - public: - pollable_driver(epoll_container& c, int fd, int epoll_fd) : - pollable(fd, epoll_fd), - loop_(new epoll_event_loop(*this)), - driver_(c, loop_) - { - proton::connection conn = driver_.connection(); - proton::io::set_link_namer(conn, c.link_namer); - } - - ~pollable_driver() { - loop_->close(); // No calls to notify() after this. - driver_.dispatch(); // Run any final events. - try { write(); } catch(...) {} // Write connection close if we can. - for (auto f : loop_->pop_all()) {// Run final queued work for side-effects. - try { f(); } catch(...) {} - } - } - - uint32_t work(uint32_t events) { - try { - bool can_read = events & EPOLLIN, can_write = events & EPOLLOUT; - do { - can_write = can_write && write(); - can_read = can_read && read(); - for (auto f : loop_->pop_all()) // Run queued work - f(); - driver_.dispatch(); - } while (can_read || can_write); - return (driver_.read_buffer().size ? EPOLLIN:0) | - (driver_.write_buffer().size ? EPOLLOUT:0); - } catch (const std::exception& e) { - driver_.disconnected(proton::error_condition("exception", e.what())); - } - return 0; // Ending - } - - proton::io::connection_driver& driver() { return driver_; } - - private: - static bool try_again(int e) { - // These errno values from read or write mean "try again" - return (e == EAGAIN || e == EWOULDBLOCK || e == EINTR); - } - - bool write() { - proton::io::const_buffer wbuf(driver_.write_buffer()); - if (wbuf.size) { - ssize_t n = ::write(fd_, wbuf.data, wbuf.size); - if (n > 0) { - driver_.write_done(n); - return true; - } else if (n < 0 && !try_again(errno)) { - check(n, "write"); - } - } - return false; - } - - bool read() { - proton::io::mutable_buffer rbuf(driver_.read_buffer()); - if (rbuf.size) { - ssize_t n = ::read(fd_, rbuf.data, rbuf.size); - if (n > 0) { - driver_.read_done(n); - return true; - } - else if (n == 0) - driver_.read_close(); - else if (!try_again(errno)) - check(n, "read"); - } - return false; - } - - // Lifecycle note: loop_ belongs to the proton::connection, which can live - // longer than the driver if the application holds a reference to it, we - // disconnect ourselves with loop_->close() in ~connection_driver() - epoll_event_loop* loop_; - proton::io::connection_driver driver_; -}; - -// A pollable listener fd that creates pollable_driver for incoming connections. -class pollable_listener : public pollable { - public: - pollable_listener( - const std::string& addr, - proton::listen_handler& l, - int epoll_fd, - epoll_container& c - ) : - pollable(socket_listen(addr), epoll_fd), - addr_(addr), - container_(c), - listener_(l) - {} - - uint32_t work(uint32_t events) { - if (events & EPOLLRDHUP) { - try { listener_.on_close(); } catch (...) {} - return 0; - } - try { - int accepted = check(::accept(fd_, NULL, 0), "accept"); - container_.add_driver(listener_.on_accept(), accepted, true); - return EPOLLIN; - } catch (const std::exception& e) { - listener_.on_error(e.what()); - return 0; - } - } - - std::string addr() { return addr_; } - - private: - - static int socket_listen(const std::string& addr) { - std::string msg = "listen on "+addr; - unique_addrinfo ainfo(addr); - unique_fd fd(check(::socket(ainfo->ai_family, SOCK_STREAM, 0), msg)); - int yes = 1; - check(::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)), msg); - check(::bind(fd, ainfo->ai_addr, ainfo->ai_addrlen), msg); - check(::listen(fd, 32), msg); - return fd.release(); - } - - std::string addr_; - std::function<proton::connection_options(const std::string&)> factory_; - epoll_container& container_; - proton::connection_options opts_; - proton::listen_handler& listener_; -}; - - -epoll_container::epoll_container(const std::string& id) - : id_(id), epoll_fd_(check(epoll_create(1), "epoll_create")), - interrupt_fd_(check(eventfd(1, 0), "eventfd")), - stopping_(false), threads_(0) -{} - -epoll_container::~epoll_container() { - try { - stop(proton::error_condition("exception", "container shut-down")); - wait(); - } catch (...) {} -} - -proton::connection epoll_container::add_driver(proton::connection_options opts, int fd, bool server) -{ - lock_guard g(lock_); - if (stopping_) - throw proton::error("container is stopping"); - std::unique_ptr<pollable_driver> eng(new pollable_driver(*this, fd, epoll_fd_)); - if (server) - eng->driver().accept(opts); - else - eng->driver().connect(opts); - proton::connection c = eng->driver().connection(); - eng->notify(); - drivers_[eng.get()] = std::move(eng); - return c; -} - -void epoll_container::erase(pollable* e) { - lock_guard g(lock_); - if (!drivers_.erase(e)) { - pollable_listener* l = dynamic_cast<pollable_listener*>(e); - if (l) - listeners_.erase(l->addr()); - } - idle_check(g); -} - -void epoll_container::idle_check(const lock_guard&) { - if (stopping_ && drivers_.empty() && listeners_.empty()) - interrupt(); -} - -proton::returned<proton::connection> epoll_container::connect( - const std::string& addr, const proton::connection_options& opts) -{ - std::string msg = "connect to "+addr; - unique_addrinfo ainfo(addr); - unique_fd fd(check(::socket(ainfo->ai_family, SOCK_STREAM, 0), msg)); - check(::connect(fd, ainfo->ai_addr, ainfo->ai_addrlen), msg); - return make_thread_safe(add_driver(opts, fd.release(), false)); -} - -proton::listener epoll_container::listen(const std::string& addr, proton::listen_handler& lh) { - lock_guard g(lock_); - if (stopping_) - throw proton::error("container is stopping"); - auto& l = listeners_[addr]; - try { - l.reset(new pollable_listener(addr, lh, epoll_fd_, *this)); - l->notify(); - return proton::listener(*this, addr); - } catch (const std::exception& e) { - lh.on_error(e.what()); - lh.on_close(); - throw; - } -} - -void epoll_container::stop_listening(const std::string& addr) { - lock_guard g(lock_); - listeners_.erase(addr); - idle_check(g); -} - -void epoll_container::run() { - ++threads_; - try { - epoll_event e; - while(true) { - check(::epoll_wait(epoll_fd_, &e, 1, -1), "epoll_wait"); - pollable* p = reinterpret_cast<pollable*>(e.data.ptr); - if (!p) - break; // Interrupted - if (!p->do_work(e.events)) - erase(p); - } - } catch (const std::exception& e) { - stop(proton::error_condition("exception", e.what())); - } - if (--threads_ == 0) - stopped_.notify_all(); -} - -void epoll_container::auto_stop(bool set) { - lock_guard g(lock_); - stopping_ = set; -} - -void epoll_container::stop(const proton::error_condition& err) { - lock_guard g(lock_); - stop_err_ = err; - interrupt(); -} - -void epoll_container::wait() { - std::unique_lock<std::mutex> l(lock_); - stopped_.wait(l, [this]() { return this->threads_ == 0; } ); - for (auto& eng : drivers_) - eng.second->driver().disconnected(stop_err_); - listeners_.clear(); - drivers_.clear(); -} - -void epoll_container::interrupt() { - // Add an always-readable fd with 0 data and no ONESHOT to interrupt all threads. - epoll_event ev = {}; - ev.events = EPOLLIN; - check(epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupt_fd_, &ev), "interrupt"); -} - -} - -// This is the only public function. -std::unique_ptr<proton::container> make_mt_container(const std::string& id) { - return std::unique_ptr<proton::container>(new epoll_container(id)); -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4eba80ee/examples/cpp/mt/mt_container.hpp ---------------------------------------------------------------------- diff --git a/examples/cpp/mt/mt_container.hpp b/examples/cpp/mt/mt_container.hpp deleted file mode 100644 index 164fe72..0000000 --- a/examples/cpp/mt/mt_container.hpp +++ /dev/null @@ -1,29 +0,0 @@ -#ifndef MT_MT_CONTROLLER_HPP -#define MT_MT_CONTROLLER_HPP - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <proton/default_container.hpp> -#include <memory> - -// Defined in whichever MT container implementation we are linked with. -std::unique_ptr<proton::container> make_mt_container(const std::string& id); - -#endif // MT_MT_DEFAULT_CONTAINER.HPP --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
