http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/cpp/helloworld_direct.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/helloworld_direct.cpp b/examples/cpp/helloworld_direct.cpp deleted file mode 100644 index b3a1af8..0000000 --- a/examples/cpp/helloworld_direct.cpp +++ /dev/null @@ -1,83 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <proton/connection.hpp> -#include <proton/container.hpp> -#include <proton/default_container.hpp> -#include <proton/listener.hpp> -#include <proton/message.hpp> -#include <proton/messaging_handler.hpp> -#include <proton/sender.hpp> -#include <proton/thread_safe.hpp> -#include <proton/tracker.hpp> - -#include <iostream> - -#include "fake_cpp11.hpp" - -class hello_world_direct : public proton::messaging_handler { - private: - std::string url; - proton::listener listener; - - public: - hello_world_direct(const std::string& u) : url(u) {} - - void on_container_start(proton::container &c) OVERRIDE { - listener = c.listen(url); - c.open_sender(url); - } - - void on_sendable(proton::sender &s) OVERRIDE { - proton::message m("Hello World!"); - s.send(m); - s.close(); - } - - void on_message(proton::delivery &, proton::message &m) OVERRIDE { - std::cout << m.body() << std::endl; - } - - void on_tracker_accept(proton::tracker &t) OVERRIDE { - t.connection().close(); - } - - void on_connection_close(proton::connection&) OVERRIDE { - listener.stop(); - } -}; - -int main(int argc, char **argv) { - try { - // Pick an "unusual" port since we are going to be talking to - // ourselves, not a broker. - std::string url = argc > 1 ? argv[1] : "127.0.0.1:8888/examples"; - - hello_world_direct hwd(url); - proton::default_container(hwd).run(); - - return 0; - } catch (const std::exception& e) { - std::cerr << e.what() << std::endl; - } - - return 1; -}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/cpp/mt/broker.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/mt/broker.cpp b/examples/cpp/mt/broker.cpp deleted file mode 100644 index 83b7005..0000000 --- a/examples/cpp/mt/broker.cpp +++ /dev/null @@ -1,318 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "../options.hpp" -#include "mt_container.hpp" - -#include <proton/connection.hpp> -#include <proton/connection_options.hpp> -#include <proton/container.hpp> -#include <proton/default_container.hpp> -#include <proton/delivery.hpp> -#include <proton/error_condition.hpp> -#include <proton/listen_handler.hpp> -#include <proton/listener.hpp> -#include <proton/message.hpp> -#include <proton/messaging_handler.hpp> -#include <proton/sender_options.hpp> -#include <proton/source_options.hpp> -#include <proton/target.hpp> -#include <proton/thread_safe.hpp> -#include <proton/tracker.hpp> - -#include <atomic> -#include <deque> -#include <functional> -#include <iostream> -#include <map> -#include <mutex> -#include <thread> - -#include "../fake_cpp11.hpp" - -// Thread safe queue. -// Stores messages, notifies subscribed connections when there is data. -class queue { - public: - queue(const std::string& name) : name_(name) {} - - std::string name() const { return name_; } - - // Push a message onto the queue. - // If the queue was previously empty, notify subscribers it has messages. - // Called from receiver's connection. - void push(const proton::message &m) { - std::lock_guard<std::mutex> g(lock_); - messages_.push_back(m); - if (messages_.size() == 1) { // Non-empty, notify subscribers - for (auto cb : callbacks_) - cb(this); - callbacks_.clear(); - } - } - - // If the queue is not empty, pop a message into m and return true. - // Otherwise save callback to be called when there are messages and return false. - // Called from sender's connection. - bool pop(proton::message& m, std::function<void(queue*)> callback) { - std::lock_guard<std::mutex> g(lock_); - if (messages_.empty()) { - callbacks_.push_back(callback); - return false; - } else { - m = std::move(messages_.front()); - messages_.pop_front(); - return true; - } - } - - private: - const std::string name_; - std::mutex lock_; - std::deque<proton::message> messages_; - std::vector<std::function<void(queue*)> > callbacks_; -}; - -/// Thread safe map of queues. -class queues { - public: - queues() : next_id_(0) {} - - // Get or create the named queue. - queue* get(const std::string& name) { - std::lock_guard<std::mutex> g(lock_); - auto i = queues_.insert(queue_map::value_type(name, nullptr)).first; - if (!i->second) - i->second.reset(new queue(name)); - return i->second.get(); - } - - // Create a dynamic queue with a unique name. - queue* dynamic() { - std::ostringstream os; - os << "_dynamic_" << next_id_++; - return get(os.str()); - } - - private: - typedef std::map<std::string, std::unique_ptr<queue> > queue_map; - - std::mutex lock_; - queue_map queues_; - std::atomic<int> next_id_; // Use to generate unique queue IDs. -}; - -/// Broker connection handler. Things to note: -/// -/// 1. Each handler manages a single connection. -/// -/// 2. For a *single connection* calls to proton::handler functions and calls to -/// function objects passed to proton::event_loop::inject() are serialized, -/// i.e. never called concurrently. Handlers can have per-connection state -/// without needing locks. -/// -/// 3. Handler/injected functions for *different connections* can be called -/// concurrently. Resources used by multiple connections (e.g. the queues in -/// this example) must be thread-safe. -/// -/// 4. You can 'inject' work to be done sequentially using a connection's -/// proton::event_loop. In this example, we create a std::function callback -/// that we pass to queues, so they can notify us when they have messages. -/// -class broker_connection_handler : public proton::messaging_handler { - public: - broker_connection_handler(queues& qs) : queues_(qs) {} - - void on_connection_open(proton::connection& c) OVERRIDE { - // Create the has_messages callback for queue subscriptions. - // - // Make a std::shared_ptr to a thread_safe handle for our proton::connection. - // The connection's proton::event_loop will remain valid as a shared_ptr exists. - std::shared_ptr<proton::thread_safe<proton::connection> > ts_c = make_shared_thread_safe(c); - - // Make a lambda function to inject a call to this->has_messages() via the proton::event_loop. - // The function is bound to a shared_ptr so this is safe. If the connection has already closed - // proton::event_loop::inject() will drop the callback. - has_messages_callback_ = [this, ts_c](queue* q) mutable { - ts_c->event_loop()->inject( - std::bind(&broker_connection_handler::has_messages, this, q)); - }; - - c.open(); // Accept the connection - } - - // A sender sends messages from a queue to a subscriber. - void on_sender_open(proton::sender &sender) OVERRIDE { - queue *q = sender.source().dynamic() ? - queues_.dynamic() : queues_.get(sender.source().address()); - sender.open(proton::sender_options().source((proton::source_options().address(q->name())))); - std::cout << "sending from " << q->name() << std::endl; - } - - // We have credit to send a message. - void on_sendable(proton::sender &s) OVERRIDE { - queue* q = sender_queue(s); - if (!do_send(q, s)) // Queue is empty, save ourselves in the blocked set. - blocked_.insert(std::make_pair(q, s)); - } - - // A receiver receives messages from a publisher to a queue. - void on_receiver_open(proton::receiver &r) OVERRIDE { - std::string qname = r.target().address(); - if (qname == "shutdown") { - std::cout << "broker shutting down" << std::endl; - // Sending to the special "shutdown" queue stops the broker. - r.connection().container().stop( - proton::error_condition("shutdown", "stop broker")); - } else { - std::cout << "receiving to " << qname << std::endl; - } - } - - // A message is received. - void on_message(proton::delivery &d, proton::message &m) OVERRIDE { - std::string qname = d.receiver().target().address(); - queues_.get(qname)->push(m); - } - - void on_session_close(proton::session &session) OVERRIDE { - // Erase all blocked senders that belong to session. - auto predicate = [session](const proton::sender& s) { - return s.session() == session; - }; - erase_sender_if(blocked_.begin(), blocked_.end(), predicate); - } - - void on_sender_close(proton::sender &sender) OVERRIDE { - // Erase sender from the blocked set. - auto range = blocked_.equal_range(sender_queue(sender)); - auto predicate = [sender](const proton::sender& s) { return s == sender; }; - erase_sender_if(range.first, range.second, predicate); - } - - void on_error(const proton::error_condition& e) OVERRIDE { - std::cerr << "error: " << e.what() << std::endl; - } - // The container calls on_transport_close() last. - void on_transport_close(proton::transport&) OVERRIDE { - delete this; // All done. - } - - private: - typedef std::multimap<queue*, proton::sender> blocked_map; - - // Get the queue associated with a sender. - queue* sender_queue(const proton::sender& s) { - return queues_.get(s.source().address()); // Thread safe. - } - - // Only called if we have credit. Return true if we sent a message. - bool do_send(queue* q, proton::sender &s) { - proton::message m; - bool popped = q->pop(m, has_messages_callback_); - if (popped) - s.send(m); - /// if !popped the queue has saved the callback for later. - return popped; - } - - // Called via the connection's proton::event_loop when q has messages. - // Try all the blocked senders. - void has_messages(queue* q) { - auto range = blocked_.equal_range(q); - for (auto i = range.first; i != range.second;) { - if (i->second.credit() <= 0 || do_send(q, i->second)) - i = blocked_.erase(i); // No credit or send was successful, stop blocked. - else - ++i; // have credit, didn't send, keep blocked - } - } - - // Use to erase closed senders from blocked_ set. - template <class Predicate> - void erase_sender_if(blocked_map::iterator begin, blocked_map::iterator end, Predicate p) { - for (auto i = begin; i != end; ) { - if (p(i->second)) - i = blocked_.erase(i); - else - ++i; - } - } - - queues& queues_; - blocked_map blocked_; - std::function<void(queue*)> has_messages_callback_; - proton::connection connection_; -}; - - -class broker { - public: - broker(const std::string addr) : - container_(make_mt_container("mt_broker")), listener_(queues_) - { - container_->listen(addr, listener_); - std::cout << "broker listening on " << addr << std::endl; - } - - void run() { - std::vector<std::thread> threads(std::thread::hardware_concurrency()-1); - for (auto& t : threads) - t = std::thread(&proton::container::run, container_.get()); - container_->run(); // Use this thread too. - for (auto& t : threads) - t.join(); - } - - private: - struct listener : public proton::listen_handler { - listener(queues& qs) : queues_(qs) {} - - proton::connection_options on_accept() OVERRIDE{ - return proton::connection_options().handler(*(new broker_connection_handler(queues_))); - } - - void on_error(const std::string& s) OVERRIDE { - std::cerr << "listen error: " << s << std::endl; - throw std::runtime_error(s); - } - queues& queues_; - }; - - queues queues_; - std::unique_ptr<proton::container> container_; - listener listener_; -}; - -int main(int argc, char **argv) { - // Command line options - std::string address("0.0.0.0"); - example::options opts(argc, argv); - opts.add_value(address, 'a', "address", "listen on URL", "URL"); - try { - opts.parse(); - broker(address).run(); - return 0; - } catch (const example::bad_option& e) { - std::cout << opts << std::endl << e.what() << std::endl; - } catch (const std::exception& e) { - std::cerr << "broker shutdown: " << e.what() << std::endl; - } - return 1; -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/cpp/mt/epoll_container.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/mt/epoll_container.cpp b/examples/cpp/mt/epoll_container.cpp deleted file mode 100644 index 7646673..0000000 --- a/examples/cpp/mt/epoll_container.cpp +++ /dev/null @@ -1,541 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "mt_container.hpp" - -#include <proton/default_container.hpp> -#include <proton/event_loop.hpp> -#include <proton/listen_handler.hpp> -#include <proton/url.hpp> - -#include <proton/io/container_impl_base.hpp> -#include <proton/io/connection_driver.hpp> -#include <proton/io/link_namer.hpp> - -#include <atomic> -#include <memory> -#include <mutex> -#include <condition_variable> -#include <thread> -#include <set> -#include <sstream> -#include <system_error> - -// Linux native IO -#include <assert.h> -#include <errno.h> -#include <fcntl.h> -#include <netdb.h> -#include <sys/epoll.h> -#include <sys/eventfd.h> -#include <unistd.h> - -#include "../fake_cpp11.hpp" - -// Private implementation -namespace { - - -using lock_guard = std::lock_guard<std::mutex>; - -// Get string from errno -std::string errno_str(const std::string& msg) { - return std::system_error(errno, std::system_category(), msg).what(); -} - -// Throw proton::error(errno_str(msg)) if result < 0 -int check(int result, const std::string& msg) { - if (result < 0) - throw proton::error(errno_str(msg)); - return result; -} - -// Wrapper for getaddrinfo() that cleans up in destructor. -class unique_addrinfo { - public: - unique_addrinfo(const std::string& addr) : addrinfo_(0) { - proton::url u(addr); - int result = ::getaddrinfo(char_p(u.host()), char_p(u.port()), 0, &addrinfo_); - if (result) - throw proton::error(std::string("bad address: ") + gai_strerror(result)); - } - ~unique_addrinfo() { if (addrinfo_) ::freeaddrinfo(addrinfo_); } - - ::addrinfo* operator->() const { return addrinfo_; } - - private: - static const char* char_p(const std::string& s) { return s.empty() ? 0 : s.c_str(); } - ::addrinfo *addrinfo_; -}; - -// File descriptor wrapper that calls ::close in destructor. -class unique_fd { - public: - unique_fd(int fd) : fd_(fd) {} - ~unique_fd() { if (fd_ >= 0) ::close(fd_); } - operator int() const { return fd_; } - int release() { int ret = fd_; fd_ = -1; return ret; } - - protected: - int fd_; -}; - -class pollable; -class pollable_driver; -class pollable_listener; - -class epoll_container : public proton::io::container_impl_base { - public: - epoll_container(const std::string& id); - ~epoll_container(); - - // Pull in base class functions here so that name search finds all the overloads - using standard_container::stop; - using standard_container::connect; - using standard_container::listen; - - proton::returned<proton::connection> connect( - const std::string& addr, const proton::connection_options& opts) OVERRIDE; - - proton::listener listen(const std::string& addr, proton::listen_handler&) OVERRIDE; - - void stop_listening(const std::string& addr) OVERRIDE; - - void run() OVERRIDE; - void auto_stop(bool) OVERRIDE; - void stop(const proton::error_condition& err) OVERRIDE; - - std::string id() const OVERRIDE { return id_; } - - // Functions used internally. - proton::connection add_driver(proton::connection_options opts, int fd, bool server); - void erase(pollable*); - - // Link names must be unique per container. - // Generate unique names with a simple atomic counter. - class atomic_link_namer : public proton::io::link_namer { - public: - std::string link_name() { - std::ostringstream o; - o << std::hex << ++count_; - return o.str(); - } - private: - std::atomic<int> count_; - }; - - // FIXME aconway 2016-06-07: Unfinished - void schedule(proton::duration, std::function<void()>) OVERRIDE { throw std::logic_error("FIXME"); } - void schedule(proton::duration, proton::void_function0&) OVERRIDE { throw std::logic_error("FIXME"); } - atomic_link_namer link_namer; - - private: - template <class T> void store(T& v, const T& x) const { lock_guard g(lock_); v = x; } - - void idle_check(const lock_guard&); - void interrupt(); - void wait(); - - const std::string id_; - const unique_fd epoll_fd_; - const unique_fd interrupt_fd_; - - mutable std::mutex lock_; - - proton::connection_options options_; - std::map<std::string, std::unique_ptr<pollable_listener> > listeners_; - std::map<pollable*, std::unique_ptr<pollable_driver> > drivers_; - - std::condition_variable stopped_; - bool stopping_; - proton::error_condition stop_err_; - std::atomic<size_t> threads_; -}; - -// Base class for pollable file-descriptors. Manages epoll interaction, -// subclasses implement virtual work() to do their serialized work. -class pollable { - public: - pollable(int fd, int epoll_fd) : fd_(fd), epoll_fd_(epoll_fd), notified_(false), working_(false) - { - int flags = check(::fcntl(fd, F_GETFL, 0), "non-blocking"); - check(::fcntl(fd, F_SETFL, flags | O_NONBLOCK), "non-blocking"); - ::epoll_event ev = {}; - ev.data.ptr = this; - ::epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, fd_, &ev); - } - - virtual ~pollable() { - ::epoll_event ev = {}; - ::epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, fd_, &ev); // Ignore errors. - } - - bool do_work(uint32_t events) { - { - lock_guard g(lock_); - if (working_) - return true; // Another thread is already working. - working_ = true; - notified_ = false; - } - uint32_t new_events = work(events); // Serialized, outside the lock. - if (new_events) { - lock_guard g(lock_); - rearm(notified_ ? EPOLLIN|EPOLLOUT : new_events); - } - return new_events; - } - - // Called from any thread to wake up the connection handler. - void notify() { - lock_guard g(lock_); - if (!notified_) { - notified_ = true; - if (!working_) // No worker thread, rearm now. - rearm(EPOLLIN|EPOLLOUT); - } - } - - protected: - - // Subclass implements work. - // Returns epoll events to re-enable or 0 if finished. - virtual uint32_t work(uint32_t events) = 0; - - const unique_fd fd_; - const int epoll_fd_; - - private: - - void rearm(uint32_t events) { - epoll_event ev; - ev.data.ptr = this; - ev.events = EPOLLONESHOT | events; - check(::epoll_ctl(epoll_fd_, EPOLL_CTL_MOD, fd_, &ev), "re-arm epoll"); - working_ = false; - } - - std::mutex lock_; - bool notified_; - bool working_; -}; - -class epoll_event_loop : public proton::event_loop { - public: - typedef std::vector<std::function<void()> > jobs; - - epoll_event_loop(pollable& p) : pollable_(p), closed_(false) {} - - bool inject(std::function<void()> f) OVERRIDE { - // Note this is an unbounded work queue. - // A resource-safe implementation should be bounded. - lock_guard g(lock_); - if (closed_) - return false; - jobs_.push_back(f); - pollable_.notify(); - return true; - } - - bool inject(proton::void_function0& f) OVERRIDE { - return inject([&f]() { f(); }); - } - - jobs pop_all() { - lock_guard g(lock_); - return std::move(jobs_); - } - - void close() { - lock_guard g(lock_); - closed_ = true; - } - - private: - std::mutex lock_; - pollable& pollable_; - jobs jobs_; - bool closed_; -}; - -// Handle epoll wakeups for a connection_driver. -class pollable_driver : public pollable { - public: - pollable_driver(epoll_container& c, int fd, int epoll_fd) : - pollable(fd, epoll_fd), - loop_(new epoll_event_loop(*this)), - driver_(c, loop_) - { - proton::connection conn = driver_.connection(); - proton::io::set_link_namer(conn, c.link_namer); - } - - ~pollable_driver() { - loop_->close(); // No calls to notify() after this. - driver_.dispatch(); // Run any final events. - try { write(); } catch(...) {} // Write connection close if we can. - for (auto f : loop_->pop_all()) {// Run final queued work for side-effects. - try { f(); } catch(...) {} - } - } - - uint32_t work(uint32_t events) { - try { - bool can_read = events & EPOLLIN, can_write = events & EPOLLOUT; - do { - can_write = can_write && write(); - can_read = can_read && read(); - for (auto f : loop_->pop_all()) // Run queued work - f(); - driver_.dispatch(); - } while (can_read || can_write); - return (driver_.read_buffer().size ? EPOLLIN:0) | - (driver_.write_buffer().size ? EPOLLOUT:0); - } catch (const std::exception& e) { - driver_.disconnected(proton::error_condition("exception", e.what())); - } - return 0; // Ending - } - - proton::io::connection_driver& driver() { return driver_; } - - private: - static bool try_again(int e) { - // These errno values from read or write mean "try again" - return (e == EAGAIN || e == EWOULDBLOCK || e == EINTR); - } - - bool write() { - proton::io::const_buffer wbuf(driver_.write_buffer()); - if (wbuf.size) { - ssize_t n = ::write(fd_, wbuf.data, wbuf.size); - if (n > 0) { - driver_.write_done(n); - return true; - } else if (n < 0 && !try_again(errno)) { - check(n, "write"); - } - } - return false; - } - - bool read() { - proton::io::mutable_buffer rbuf(driver_.read_buffer()); - if (rbuf.size) { - ssize_t n = ::read(fd_, rbuf.data, rbuf.size); - if (n > 0) { - driver_.read_done(n); - return true; - } - else if (n == 0) - driver_.read_close(); - else if (!try_again(errno)) - check(n, "read"); - } - return false; - } - - // Lifecycle note: loop_ belongs to the proton::connection, which can live - // longer than the driver if the application holds a reference to it, we - // disconnect ourselves with loop_->close() in ~connection_driver() - epoll_event_loop* loop_; - proton::io::connection_driver driver_; -}; - -// A pollable listener fd that creates pollable_driver for incoming connections. -class pollable_listener : public pollable { - public: - pollable_listener( - const std::string& addr, - proton::listen_handler& l, - int epoll_fd, - epoll_container& c - ) : - pollable(socket_listen(addr), epoll_fd), - addr_(addr), - container_(c), - listener_(l) - {} - - uint32_t work(uint32_t events) { - if (events & EPOLLRDHUP) { - try { listener_.on_close(); } catch (...) {} - return 0; - } - try { - int accepted = check(::accept(fd_, NULL, 0), "accept"); - container_.add_driver(listener_.on_accept(), accepted, true); - return EPOLLIN; - } catch (const std::exception& e) { - listener_.on_error(e.what()); - return 0; - } - } - - std::string addr() { return addr_; } - - private: - - static int socket_listen(const std::string& addr) { - std::string msg = "listen on "+addr; - unique_addrinfo ainfo(addr); - unique_fd fd(check(::socket(ainfo->ai_family, SOCK_STREAM, 0), msg)); - int yes = 1; - check(::setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)), msg); - check(::bind(fd, ainfo->ai_addr, ainfo->ai_addrlen), msg); - check(::listen(fd, 32), msg); - return fd.release(); - } - - std::string addr_; - std::function<proton::connection_options(const std::string&)> factory_; - epoll_container& container_; - proton::connection_options opts_; - proton::listen_handler& listener_; -}; - - -epoll_container::epoll_container(const std::string& id) - : id_(id), epoll_fd_(check(epoll_create(1), "epoll_create")), - interrupt_fd_(check(eventfd(1, 0), "eventfd")), - stopping_(false), threads_(0) -{} - -epoll_container::~epoll_container() { - try { - stop(proton::error_condition("exception", "container shut-down")); - wait(); - } catch (...) {} -} - -proton::connection epoll_container::add_driver(proton::connection_options opts, int fd, bool server) -{ - lock_guard g(lock_); - if (stopping_) - throw proton::error("container is stopping"); - std::unique_ptr<pollable_driver> eng(new pollable_driver(*this, fd, epoll_fd_)); - if (server) - eng->driver().accept(opts); - else - eng->driver().connect(opts); - proton::connection c = eng->driver().connection(); - eng->notify(); - drivers_[eng.get()] = std::move(eng); - return c; -} - -void epoll_container::erase(pollable* e) { - lock_guard g(lock_); - if (!drivers_.erase(e)) { - pollable_listener* l = dynamic_cast<pollable_listener*>(e); - if (l) - listeners_.erase(l->addr()); - } - idle_check(g); -} - -void epoll_container::idle_check(const lock_guard&) { - if (stopping_ && drivers_.empty() && listeners_.empty()) - interrupt(); -} - -proton::returned<proton::connection> epoll_container::connect( - const std::string& addr, const proton::connection_options& opts) -{ - std::string msg = "connect to "+addr; - unique_addrinfo ainfo(addr); - unique_fd fd(check(::socket(ainfo->ai_family, SOCK_STREAM, 0), msg)); - check(::connect(fd, ainfo->ai_addr, ainfo->ai_addrlen), msg); - return make_thread_safe(add_driver(opts, fd.release(), false)); -} - -proton::listener epoll_container::listen(const std::string& addr, proton::listen_handler& lh) { - lock_guard g(lock_); - if (stopping_) - throw proton::error("container is stopping"); - auto& l = listeners_[addr]; - try { - l.reset(new pollable_listener(addr, lh, epoll_fd_, *this)); - l->notify(); - return proton::listener(*this, addr); - } catch (const std::exception& e) { - lh.on_error(e.what()); - lh.on_close(); - throw; - } -} - -void epoll_container::stop_listening(const std::string& addr) { - lock_guard g(lock_); - listeners_.erase(addr); - idle_check(g); -} - -void epoll_container::run() { - ++threads_; - try { - epoll_event e; - while(true) { - check(::epoll_wait(epoll_fd_, &e, 1, -1), "epoll_wait"); - pollable* p = reinterpret_cast<pollable*>(e.data.ptr); - if (!p) - break; // Interrupted - if (!p->do_work(e.events)) - erase(p); - } - } catch (const std::exception& e) { - stop(proton::error_condition("exception", e.what())); - } - if (--threads_ == 0) - stopped_.notify_all(); -} - -void epoll_container::auto_stop(bool set) { - lock_guard g(lock_); - stopping_ = set; -} - -void epoll_container::stop(const proton::error_condition& err) { - lock_guard g(lock_); - stop_err_ = err; - interrupt(); -} - -void epoll_container::wait() { - std::unique_lock<std::mutex> l(lock_); - stopped_.wait(l, [this]() { return this->threads_ == 0; } ); - for (auto& eng : drivers_) - eng.second->driver().disconnected(stop_err_); - listeners_.clear(); - drivers_.clear(); -} - -void epoll_container::interrupt() { - // Add an always-readable fd with 0 data and no ONESHOT to interrupt all threads. - epoll_event ev = {}; - ev.events = EPOLLIN; - check(epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, interrupt_fd_, &ev), "interrupt"); -} - -} - -// This is the only public function. -std::unique_ptr<proton::container> make_mt_container(const std::string& id) { - return std::unique_ptr<proton::container>(new epoll_container(id)); -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/cpp/mt/mt_container.hpp ---------------------------------------------------------------------- diff --git a/examples/cpp/mt/mt_container.hpp b/examples/cpp/mt/mt_container.hpp deleted file mode 100644 index 164fe72..0000000 --- a/examples/cpp/mt/mt_container.hpp +++ /dev/null @@ -1,29 +0,0 @@ -#ifndef MT_MT_CONTROLLER_HPP -#define MT_MT_CONTROLLER_HPP - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <proton/default_container.hpp> -#include <memory> - -// Defined in whichever MT container implementation we are linked with. -std::unique_ptr<proton::container> make_mt_container(const std::string& id); - -#endif // MT_MT_DEFAULT_CONTAINER.HPP http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/cpp/options.hpp ---------------------------------------------------------------------- diff --git a/examples/cpp/options.hpp b/examples/cpp/options.hpp deleted file mode 100644 index dab1bc2..0000000 --- a/examples/cpp/options.hpp +++ /dev/null @@ -1,175 +0,0 @@ -#ifndef OPTIONS_HPP -#define OPTIONS_HPP -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <string> -#include <sstream> -#include <ostream> -#include <vector> -#include <stdexcept> - -namespace example { -/** bad_option is thrown for option parsing errors */ -struct bad_option : public std::runtime_error { - bad_option(const std::string& s) : std::runtime_error(s) {} -}; - -/** Simple command-line option parser for example programs */ -class options { - public: - - options(int argc, char const * const * argv) : argc_(argc), argv_(argv), prog_(argv[0]), help_() { - size_t slash = prog_.find_last_of("/\\"); - if (slash != std::string::npos) - prog_ = prog_.substr(slash+1); // Extract prog name from path - add_flag(help_, 'h', "help", "Print the help message"); - } - - ~options() { - for (opts::iterator i = opts_.begin(); i != opts_.end(); ++i) - delete *i; - } - - /** Updates value when parse() is called if option is present with a value. */ - template<class T> - void add_value(T& value, char short_name, const std::string& long_name, const std::string& description, const std::string var) { - opts_.push_back(new option_value<T>(value, short_name, long_name, description, var)); - } - - /** Sets flag when parse() is called if option is present. */ - void add_flag(bool& flag, char short_name, const std::string& long_name, const std::string& description) { - opts_.push_back(new option_flag(flag, short_name, long_name, description)); - } - - /** Parse the command line, return the index of the first non-option argument. - *@throws bad_option if there is a parsing error or unknown option. - */ - int parse() { - int arg = 1; - for (; arg < argc_ && argv_[arg][0] == '-'; ++arg) { - opts::iterator i = opts_.begin(); - while (i != opts_.end() && !(*i)->parse(argc_, argv_, arg)) - ++i; - if (i == opts_.end()) - throw bad_option(std::string("unknown option ") + argv_[arg]); - } - if (help_) throw bad_option(""); - return arg; - } - - /** Print a usage message */ - friend std::ostream& operator<<(std::ostream& os, const options& op) { - os << std::endl << "usage: " << op.prog_ << " [options]" << std::endl; - os << std::endl << "options:" << std::endl; - for (opts::const_iterator i = op.opts_.begin(); i < op.opts_.end(); ++i) - os << **i << std::endl; - return os; - } - - private: - class option { - public: - option(char s, const std::string& l, const std::string& d, const std::string v) : - short_(std::string("-") + s), long_("--" + l), desc_(d), var_(v) {} - virtual ~option() {} - - virtual bool parse(int argc, char const * const * argv, int &i) = 0; - virtual void print_default(std::ostream&) const {} - - friend std::ostream& operator<<(std::ostream& os, const option& op) { - os << " " << op.short_; - if (!op.var_.empty()) os << " " << op.var_; - os << ", " << op.long_; - if (!op.var_.empty()) os << "=" << op.var_; - os << std::endl << " " << op.desc_; - op.print_default(os); - return os; - } - - protected: - std::string short_, long_, desc_, var_; - }; - - template <class T> - class option_value : public option { - public: - option_value(T& value, char s, const std::string& l, const std::string& d, const std::string& v) : - option(s, l, d, v), value_(value) {} - - bool parse(int argc, char const * const * argv, int &i) { - std::string arg(argv[i]); - if (arg == short_ || arg == long_) { - if (i < argc-1) { - set_value(arg, argv[++i]); - return true; - } else { - throw bad_option("missing value for " + arg); - } - } - if (arg.compare(0, long_.size(), long_) == 0 && arg[long_.size()] == '=' ) { - set_value(long_, arg.substr(long_.size()+1)); - return true; - } - return false; - } - - virtual void print_default(std::ostream& os) const { os << " (default " << value_ << ")"; } - - void set_value(const std::string& opt, const std::string& s) { - std::istringstream is(s); - is >> value_; - if (is.fail() || is.bad()) - throw bad_option("bad value for " + opt + ": " + s); - } - - private: - T& value_; - }; - - class option_flag: public option { - public: - option_flag(bool& flag, const char s, const std::string& l, const std::string& d) : - option(s, l, d, ""), flag_(flag) - { flag_ = false; } - - bool parse(int /*argc*/, char const * const * argv, int &i) { - if (argv[i] == short_ || argv[i] == long_) { - flag_ = true; - return true; - } else { - return false; - } - } - - private: - bool &flag_; - }; - - typedef std::vector<option*> opts; - - int argc_; - char const * const * argv_; - std::string prog_; - opts opts_; - bool help_; -}; -} - -#endif // OPTIONS_HPP http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/cpp/queue_browser.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/queue_browser.cpp b/examples/cpp/queue_browser.cpp deleted file mode 100644 index 583277e..0000000 --- a/examples/cpp/queue_browser.cpp +++ /dev/null @@ -1,70 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <proton/connection.hpp> -#include <proton/container.hpp> -#include <proton/default_container.hpp> -#include <proton/delivery.hpp> -#include <proton/message.hpp> -#include <proton/messaging_handler.hpp> -#include <proton/receiver_options.hpp> -#include <proton/source_options.hpp> -#include <proton/thread_safe.hpp> -#include <proton/url.hpp> - -#include <iostream> - -#include "fake_cpp11.hpp" - -using proton::source_options; - -class browser : public proton::messaging_handler { - private: - proton::url url; - - public: - browser(const std::string& u) : url(u) {} - - void on_container_start(proton::container &c) OVERRIDE { - proton::connection conn = c.connect(url); - source_options browsing = source_options().distribution_mode(proton::source::COPY); - conn.open_receiver(url.path(), proton::receiver_options().source(browsing)); - } - - void on_message(proton::delivery &, proton::message &m) OVERRIDE { - std::cout << m.body() << std::endl; - } -}; - -int main(int argc, char **argv) { - try { - std::string url = argc > 1 ? argv[1] : "127.0.0.1:5672/examples"; - - browser b(url); - proton::default_container(b).run(); - - return 0; - } catch (const std::exception& e) { - std::cerr << e.what() << std::endl; - } - - return 1; -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/cpp/scheduled_send.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/scheduled_send.cpp b/examples/cpp/scheduled_send.cpp deleted file mode 100644 index ef6cd27..0000000 --- a/examples/cpp/scheduled_send.cpp +++ /dev/null @@ -1,113 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "options.hpp" - -#include <proton/container.hpp> -#include <proton/default_container.hpp> -#include <proton/message.hpp> -#include <proton/messaging_handler.hpp> -#include <proton/sender.hpp> -#include <proton/thread_safe.hpp> -#include <proton/tracker.hpp> - -#include <iostream> - -#include "fake_cpp11.hpp" - -// Send messages at a constant rate one per interval. cancel after a timeout. -class scheduled_sender : public proton::messaging_handler { - private: - std::string url; - proton::sender sender; - proton::duration interval, timeout; - bool ready, canceled; - - public: - - scheduled_sender(const std::string &s, double d, double t) : - url(s), - interval(int(d*proton::duration::SECOND.milliseconds())), // Send interval. - timeout(int(t*proton::duration::SECOND.milliseconds())), // Cancel after timeout. - ready(true), // Ready to send. - canceled(false) // Canceled. - {} - - void on_container_start(proton::container &c) OVERRIDE { - sender = c.open_sender(url); - // Call this->cancel after timeout. - c.schedule(timeout, [this]() { this->cancel(); }); - // Start regular ticks every interval. - c.schedule(interval, [this]() { this->tick(); }); - } - - void cancel() { - canceled = true; - sender.connection().close(); - } - - void tick() { - // Schedule the next tick unless we have been cancelled. - if (!canceled) - sender.container().schedule(interval, [this]() { this->tick(); }); - if (sender.credit() > 0) // Only send if we have credit - send(); - else - ready = true; // Set the ready flag, send as soon as we get credit. - } - - void on_sendable(proton::sender &) OVERRIDE { - if (ready) // We have been ticked since the last send. - send(); - } - - void send() { - std::cout << "send" << std::endl; - sender.send(proton::message("ping")); - ready = false; - } -}; - - -int main(int argc, char **argv) { - std::string address("127.0.0.1:5672/examples"); - double interval = 1.0; - double timeout = 5.0; - - example::options opts(argc, argv); - - opts.add_value(address, 'a', "address", "connect and send to URL", "URL"); - opts.add_value(interval, 'i', "interval", "send a message every INTERVAL seconds", "INTERVAL"); - opts.add_value(timeout, 't', "timeout", "stop after T seconds", "T"); - - try { - opts.parse(); - scheduled_sender h(address, interval, timeout); - proton::default_container(h).run(); - return 0; - } catch (const example::bad_option& e) { - std::cout << opts << std::endl << e.what() << std::endl; - } catch (const std::exception& e) { - std::cerr << e.what() << std::endl; - } - - return 1; -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/cpp/scheduled_send_03.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/scheduled_send_03.cpp b/examples/cpp/scheduled_send_03.cpp deleted file mode 100644 index 92e5767..0000000 --- a/examples/cpp/scheduled_send_03.cpp +++ /dev/null @@ -1,132 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "options.hpp" - -#include <proton/container.hpp> -#include <proton/connection.hpp> -#include <proton/default_container.hpp> -#include <proton/duration.hpp> -#include <proton/function.hpp> -#include <proton/message.hpp> -#include <proton/messaging_handler.hpp> -#include <proton/sender.hpp> -#include <proton/thread_safe.hpp> -#include <proton/tracker.hpp> - -#include <iostream> - -#include "fake_cpp11.hpp" - -// Send messages at a constant rate one per interval. cancel after a timeout. -// This example uses only C++03 features. -class scheduled_sender : public proton::messaging_handler { - private: - std::string url; - proton::sender sender; - proton::duration interval, timeout; - bool ready, canceled; - - struct tick_fn : public proton::void_function0 { - scheduled_sender& parent; - tick_fn(scheduled_sender& ss) : parent(ss) {} - void operator()() { parent.tick(); } - }; - - struct cancel_fn : public proton::void_function0 { - scheduled_sender& parent; - cancel_fn(scheduled_sender& ss) : parent(ss) {} - void operator()() { parent.cancel(); } - }; - - tick_fn do_tick; - cancel_fn do_cancel; - - public: - - scheduled_sender(const std::string &s, double d, double t) : - url(s), - interval(int(d*proton::duration::SECOND.milliseconds())), // Send interval. - timeout(int(t*proton::duration::SECOND.milliseconds())), // Cancel after timeout. - ready(true), // Ready to send. - canceled(false), // Canceled. - do_tick(*this), - do_cancel(*this) - {} - - void on_container_start(proton::container &c) OVERRIDE { - sender = c.open_sender(url); - c.schedule(timeout, do_cancel); // Call this->cancel after timeout. - c.schedule(interval, do_tick); // Start regular ticks every interval. - } - - void cancel() { - canceled = true; - sender.connection().close(); - } - - void tick() { - if (!canceled) { - sender.container().schedule(interval, do_tick); // Next tick - if (sender.credit() > 0) // Only send if we have credit - send(); - else - ready = true; // Set the ready flag, send as soon as we get credit. - } - } - - void on_sendable(proton::sender &) OVERRIDE { - if (ready) // We have been ticked since the last send. - send(); - } - - void send() { - std::cout << "send" << std::endl; - sender.send(proton::message("ping")); - ready = false; - } -}; - - -int main(int argc, char **argv) { - std::string address("127.0.0.1:5672/examples"); - double interval = 1.0; - double timeout = 5.0; - - example::options opts(argc, argv); - - opts.add_value(address, 'a', "address", "connect and send to URL", "URL"); - opts.add_value(interval, 'i', "interval", "send a message every INTERVAL seconds", "INTERVAL"); - opts.add_value(timeout, 't', "timeout", "stop after T seconds", "T"); - - try { - opts.parse(); - scheduled_sender h(address, interval, timeout); - proton::default_container(h).run(); - return 0; - } catch (const example::bad_option& e) { - std::cout << opts << std::endl << e.what() << std::endl; - } catch (const std::exception& e) { - std::cerr << e.what() << std::endl; - } - - return 1; -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/cpp/selected_recv.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/selected_recv.cpp b/examples/cpp/selected_recv.cpp deleted file mode 100644 index a48ef0e..0000000 --- a/examples/cpp/selected_recv.cpp +++ /dev/null @@ -1,91 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <proton/connection.hpp> -#include <proton/container.hpp> -#include <proton/default_container.hpp> -#include <proton/message.hpp> -#include <proton/messaging_handler.hpp> -#include <proton/receiver_options.hpp> -#include <proton/source_options.hpp> -#include <proton/thread_safe.hpp> -#include <proton/url.hpp> - -#include <iostream> - -#include "fake_cpp11.hpp" - -namespace { - - // Example custom function to configure an AMQP filter, - // specifically an APACHE.ORG:SELECTOR - // (http://www.amqp.org/specification/1.0/filters) - - void set_filter(proton::source_options &opts, const std::string& selector_str) { - proton::source::filter_map map; - proton::symbol filter_key("selector"); - proton::value filter_value; - // The value is a specific AMQP "described type": binary string with symbolic descriptor - proton::codec::encoder enc(filter_value); - enc << proton::codec::start::described() - << proton::symbol("apache.org:selector-filter:string") - << proton::binary(selector_str) - << proton::codec::finish(); - // In our case, the map has this one element - map.put(filter_key, filter_value); - opts.filters(map); - } -} - - -class selected_recv : public proton::messaging_handler { - private: - proton::url url; - - public: - selected_recv(const std::string& u) : url(u) {} - - void on_container_start(proton::container &c) OVERRIDE { - proton::source_options opts; - set_filter(opts, "colour = 'green'"); - proton::connection conn = c.connect(url); - conn.open_receiver(url.path(), proton::receiver_options().source(opts)); - } - - void on_message(proton::delivery &, proton::message &m) OVERRIDE { - std::cout << m.body() << std::endl; - } -}; - -int main(int argc, char **argv) { - try { - std::string url = argc > 1 ? argv[1] : "127.0.0.1:5672/examples"; - - selected_recv recv(url); - proton::default_container(recv).run(); - - return 0; - } catch (const std::exception& e) { - std::cerr << e.what() << std::endl; - } - - return 1; -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/cpp/server.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/server.cpp b/examples/cpp/server.cpp deleted file mode 100644 index 449ce6e..0000000 --- a/examples/cpp/server.cpp +++ /dev/null @@ -1,104 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "options.hpp" - -#include <proton/connection.hpp> -#include <proton/container.hpp> -#include <proton/default_container.hpp> -#include <proton/message.hpp> -#include <proton/message_id.hpp> -#include <proton/messaging_handler.hpp> -#include <proton/thread_safe.hpp> -#include <proton/tracker.hpp> -#include <proton/url.hpp> - -#include <iostream> -#include <map> -#include <string> -#include <cctype> - -#include "fake_cpp11.hpp" - -class server : public proton::messaging_handler { - private: - typedef std::map<std::string, proton::sender> sender_map; - proton::url url; - proton::connection connection; - sender_map senders; - - public: - server(const std::string &u) : url(u) {} - - void on_container_start(proton::container &c) OVERRIDE { - connection = c.connect(url); - connection.open_receiver(url.path()); - - std::cout << "server connected to " << url << std::endl; - } - - std::string to_upper(const std::string &s) { - std::string uc(s); - size_t l = uc.size(); - for (size_t i=0; i<l; i++) - uc[i] = static_cast<char>(std::toupper(uc[i])); - return uc; - } - - void on_message(proton::delivery &, proton::message &m) OVERRIDE { - std::cout << "Received " << m.body() << std::endl; - - std::string reply_to = m.reply_to(); - proton::message reply; - - reply.to(reply_to); - reply.body(to_upper(proton::get<std::string>(m.body()))); - reply.correlation_id(m.correlation_id()); - - if (!senders[reply_to]) { - senders[reply_to] = connection.open_sender(reply_to); - } - - senders[reply_to].send(reply); - } -}; - -int main(int argc, char **argv) { - std::string address("amqp://0.0.0.0:5672/examples"); - example::options opts(argc, argv); - - opts.add_value(address, 'a', "address", "listen on URL", "URL"); - - try { - opts.parse(); - - server srv(address); - proton::default_container(srv).run(); - - return 0; - } catch (const example::bad_option& e) { - std::cout << opts << std::endl << e.what() << std::endl; - } catch (const std::exception& e) { - std::cerr << e.what() << std::endl; - } - - return 1; -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/cpp/server_direct.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/server_direct.cpp b/examples/cpp/server_direct.cpp deleted file mode 100644 index 22d519a..0000000 --- a/examples/cpp/server_direct.cpp +++ /dev/null @@ -1,124 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "options.hpp" - -#include <proton/container.hpp> -#include <proton/default_container.hpp> -#include <proton/listener.hpp> -#include <proton/message.hpp> -#include <proton/message_id.hpp> -#include <proton/messaging_handler.hpp> -#include <proton/sender.hpp> -#include <proton/sender_options.hpp> -#include <proton/source_options.hpp> -#include <proton/tracker.hpp> - -#include <iostream> -#include <map> -#include <string> -#include <sstream> -#include <cctype> - -#include "fake_cpp11.hpp" - -class server : public proton::messaging_handler { - private: - typedef std::map<std::string, proton::sender> sender_map; - std::string url; - sender_map senders; - int address_counter; - - public: - server(const std::string &u) : url(u), address_counter(0) {} - - void on_container_start(proton::container &c) OVERRIDE { - c.listen(url); - std::cout << "server listening on " << url << std::endl; - } - - std::string to_upper(const std::string &s) { - std::string uc(s); - size_t l = uc.size(); - - for (size_t i=0; i<l; i++) - uc[i] = static_cast<char>(std::toupper(uc[i])); - - return uc; - } - - std::string generate_address() { - std::ostringstream addr; - addr << "server" << address_counter++; - - return addr.str(); - } - - void on_sender_open(proton::sender &sender) OVERRIDE { - if (sender.source().dynamic()) { - std::string addr = generate_address(); - sender.open(proton::sender_options().source(proton::source_options().address(addr))); - senders[addr] = sender; - } - } - - void on_message(proton::delivery &, proton::message &m) OVERRIDE { - std::cout << "Received " << m.body() << std::endl; - - std::string reply_to = m.reply_to(); - sender_map::iterator it = senders.find(reply_to); - - if (it == senders.end()) { - std::cout << "No link for reply_to: " << reply_to << std::endl; - } else { - proton::sender sender = it->second; - proton::message reply; - - reply.to(reply_to); - reply.body(to_upper(proton::get<std::string>(m.body()))); - reply.correlation_id(m.correlation_id()); - - sender.send(reply); - } - } -}; - -int main(int argc, char **argv) { - std::string address("amqp://127.0.0.1:5672/examples"); - example::options opts(argc, argv); - - opts.add_value(address, 'a', "address", "listen on URL", "URL"); - - try { - opts.parse(); - - server srv(address); - proton::default_container(srv).run(); - - return 0; - } catch (const example::bad_option& e) { - std::cout << opts << std::endl << e.what() << std::endl; - } catch (const std::exception& e) { - std::cerr << e.what() << std::endl; - } - - return 1; -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/cpp/service_bus.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/service_bus.cpp b/examples/cpp/service_bus.cpp deleted file mode 100644 index 6b57f8d..0000000 --- a/examples/cpp/service_bus.cpp +++ /dev/null @@ -1,335 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -/* - * Service Bus example. - * - * This is an example of using "Service Bus sessions" (not the same thing as an - * AMQP session) to selectively retrieve messages from a queue. The queue must - * be configured within Service Bus to support sessions. Service Bus uses the - * AMQP group_id message property to associate messages with a particular - * Service Bus session. It uses AMQP filters to specify which session is - * associated with a receiver. - * - * The mechanics for sending and receiving to other types of service bus queue - * are broadly the same, as long as the step using the - * receiver.source().filters() is omitted. - * - * Other Service Bus notes: There is no drain support, hence the need to to use - * timeouts in this example to detect the end of the message stream. There is - * no browse support when setting the AMQP link distribution mode to COPY. - * Service Bus claims to support browsing, but it is unclear how to manage that - * with an AMQP client. Maximum message sizes (for body and headers) vary - * between queue types and fee tier ranging from 64KB to 1MB. Due to the - * distributed nature of Service Bus, queues do not automatically preserve FIFO - * order of messages unless the user takes steps to force the message stream to - * a single partition of the queue or creates the queue with partitioning disabled. - * - * This example shows use of the simpler SAS (Shared Access Signature) - * authentication scheme where the credentials are supplied on the connection. - * Service Bus does not actually check these credentials when setting up the - * connection, it merely caches the SAS key and policy (AKA key name) for later - * access authorization when creating senders and receivers. There is a second - * authentication scheme that allows for multiple tokens and even updating them - * within a long-lived connection which uses special management request-response - * queues in Service Bus. The format of this exchange may be documented - * somewhere but is also available by working through the CbsAsyncExample.cs - * program in the Amqp.Net Lite project. - * - * The sample output for this program is: - - sent message: message 0 in service bus session "red" - sent message: message 1 in service bus session "green" - sent message: message 2 in service bus session "blue" - sent message: message 3 in service bus session "red" - sent message: message 4 in service bus session "black" - sent message: message 5 in service bus session "blue" - sent message: message 6 in service bus session "yellow" -receiving messages with session identifier "green" from queue ses_q1 - received message: message 1 in service bus session "green" -receiving messages with session identifier "red" from queue ses_q1 - received message: message 0 in service bus session "red" - received message: message 3 in service bus session "red" -receiving messages with session identifier "blue" from queue ses_q1 - received message: message 2 in service bus session "blue" - received message: message 5 in service bus session "blue" -receiving messages with session identifier "black" from queue ses_q1 - received message: message 4 in service bus session "black" -receiving messages with session identifier "yellow" from queue ses_q1 - received message: message 6 in service bus session "yellow" -Done. No more messages. - - * - */ - -#include "options.hpp" - -#include <proton/connection.hpp> -#include <proton/connection_options.hpp> -#include <proton/container.hpp> -#include <proton/default_container.hpp> -#include <proton/delivery.hpp> -#include <proton/function.hpp> -#include <proton/message.hpp> -#include <proton/messaging_handler.hpp> -#include <proton/receiver_options.hpp> -#include <proton/sender.hpp> -#include <proton/sender_options.hpp> -#include <proton/source_options.hpp> -#include <proton/thread_safe.hpp> -#include <proton/tracker.hpp> -#include <proton/url.hpp> - -#include <iostream> -#include <sstream> - -#include "fake_cpp11.hpp" - -using proton::source_options; -using proton::connection_options; -using proton::sender_options; -using proton::receiver_options; - -void do_next_sequence(); - -namespace { -void check_arg(const std::string &value, const std::string &name) { - if (value.empty()) - throw std::runtime_error("missing argument for \"" + name + "\""); -} -} - -/// Connect to Service Bus queue and retrieve messages in a particular session. -class session_receiver : public proton::messaging_handler { - private: - const std::string &connection_url; - const std::string &entity; - proton::value session_identifier; // AMQP null type by default, matches any Service Bus sequence identifier - int message_count; - bool closed; - proton::duration read_timeout; - proton::timestamp last_read; - proton::container *container; - proton::receiver receiver; - - - struct process_timeout_fn : public proton::void_function0 { - session_receiver& parent; - process_timeout_fn(session_receiver& sr) : parent(sr) {} - void operator()() { parent.process_timeout(); } - }; - - process_timeout_fn do_process_timeout; - - - public: - session_receiver(const std::string &c, const std::string &e, - const char *sid) : connection_url(c), entity(e), message_count(0), closed(false), read_timeout(5000), - last_read(0), container(0), do_process_timeout(*this) { - if (sid) - session_identifier = std::string(sid); - // session_identifier is now either empty/null or an AMQP string type. - // If null, Service Bus will pick the first available message and create - // a filter at its end with that message's session identifier. - // Technically, an AMQP string is not a valid filter-set value unless it - // is annotated as an AMQP described type, so this may change. - - } - - void run (proton::container &c) { - message_count = 0; - closed = false; - c.connect(connection_url, connection_options().handler(*this)); - container = &c; - } - - void on_connection_open(proton::connection &connection) OVERRIDE { - proton::source::filter_map sb_filter_map; - proton::symbol key("com.microsoft:session-filter"); - sb_filter_map.put(key, session_identifier); - receiver = connection.open_receiver(entity, receiver_options().source(source_options().filters(sb_filter_map))); - - // Start timeout processing here. If Service Bus has no pending - // messages, it may defer completing the receiver open until a message - // becomes available (e.g. to be able to set the actual session - // identifier if none was specified). - last_read = proton::timestamp::now(); - // Call this->process_timeout after read_timeout. - container->schedule(read_timeout, do_process_timeout); - } - - void on_receiver_open(proton::receiver &r) OVERRIDE { - if (closed) return; // PROTON-1264 - proton::value actual_session_id = r.source().filters().get("com.microsoft:session-filter"); - std::cout << "receiving messages with session identifier \"" << actual_session_id - << "\" from queue " << entity << std::endl; - last_read = proton::timestamp::now(); - } - - void on_message(proton::delivery &, proton::message &m) OVERRIDE { - message_count++; - std::cout << " received message: " << m.body() << std::endl; - last_read = proton::timestamp::now(); - } - - void process_timeout() { - proton::timestamp deadline = last_read + read_timeout; - proton::timestamp now = proton::timestamp::now(); - if (now >= deadline) { - receiver.close(); - closed = true; - receiver.connection().close(); - if (message_count) - do_next_sequence(); - else - std::cout << "Done. No more messages." << std::endl; - } else { - proton::duration next = deadline - now; - container->schedule(next, do_process_timeout); - } - } -}; - - -/// Connect to Service Bus queue and send messages divided into different sessions. -class session_sender : public proton::messaging_handler { - private: - const std::string &connection_url; - const std::string &entity; - int msg_count; - int total; - int accepts; - - public: - session_sender(const std::string &c, const std::string &e) : connection_url(c), entity(e), - msg_count(0), total(7), accepts(0) {} - - void run(proton::container &c) { - c.open_sender(connection_url + "/" + entity, sender_options(), connection_options().handler(*this)); - } - - void send_remaining_messages(proton::sender &s) { - std::string gid; - for (; msg_count < total && s.credit() > 0; msg_count++) { - switch (msg_count) { - case 0: gid = "red"; break; - case 1: gid = "green"; break; - case 2: gid = "blue"; break; - case 3: gid = "red"; break; - case 4: gid = "black"; break; - case 5: gid = "blue"; break; - case 6: gid = "yellow"; break; - } - - std::ostringstream mbody; - mbody << "message " << msg_count << " in service bus session \"" << gid << "\""; - proton::message m(mbody.str()); - m.group_id(gid); // Service Bus uses the group_id property to as the session identifier. - s.send(m); - std::cout << " sent message: " << m.body() << std::endl; - } - } - - void on_sendable(proton::sender &s) OVERRIDE { - send_remaining_messages(s); - } - - void on_tracker_accept(proton::tracker &t) OVERRIDE { - accepts++; - if (accepts == total) { - // upload complete - t.sender().close(); - t.sender().connection().close(); - do_next_sequence(); - } - } -}; - - -/// Orchestrate the sequential actions of sending and receiving session-based messages. -class sequence : public proton::messaging_handler { - private: - proton::container *container; - int sequence_no; - session_sender snd; - session_receiver rcv_red, rcv_green, rcv_null; - - public: - static sequence *the_sequence; - - sequence (const std::string &c, const std::string &e) : sequence_no(0), - snd(c, e), rcv_red(c, e, "red"), rcv_green(c, e, "green"), rcv_null(c, e, NULL) { - the_sequence = this; - } - - void on_container_start(proton::container &c) OVERRIDE { - container = &c; - next_sequence(); - } - - void next_sequence() { - switch (sequence_no++) { - // run these in order exactly once - case 0: snd.run(*container); break; - case 1: rcv_green.run(*container); break; - case 2: rcv_red.run(*container); break; - // Run this until the receiver decides there is no messages left to sequence through - default: rcv_null.run(*container); break; - } - } -}; - -sequence *sequence::the_sequence = NULL; - -void do_next_sequence() { sequence::the_sequence->next_sequence(); } - - -int main(int argc, char **argv) { - std::string sb_namespace; // i.e. "foo.servicebus.windows.net" - // Make sure the next two are urlencoded for Proton - std::string sb_key_name; // shared access key name for entity (AKA "Policy Name") - std::string sb_key; // shared access key - std::string sb_entity; // AKA the service bus queue. Must enable - // sessions on it for this example. - - example::options opts(argc, argv); - opts.add_value(sb_namespace, 'n', "namespace", "Service Bus full namespace", "NAMESPACE"); - opts.add_value(sb_key_name, 'p', "policy", "policy name that specifies access rights (key name)", "POLICY"); - opts.add_value(sb_key, 'k', "key", "secret key for the policy", "key"); - opts.add_value(sb_entity, 'e', "entity", "entity path (queue name)", "ENTITY"); - - try { - opts.parse(); - check_arg(sb_namespace, "namespace"); - check_arg(sb_key_name, "policy"); - check_arg(sb_key, "key"); - check_arg(sb_entity, "entity"); - std::string connection_string("amqps://" + sb_key_name + ":" + sb_key + "@" + sb_namespace); - - sequence seq(connection_string, sb_entity); - proton::default_container(seq).run(); - return 0; - } catch (const std::exception& e) { - std::cerr << e.what() << std::endl; - } - - return 1; -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/cpp/simple_recv.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/simple_recv.cpp b/examples/cpp/simple_recv.cpp deleted file mode 100644 index 145eef9..0000000 --- a/examples/cpp/simple_recv.cpp +++ /dev/null @@ -1,106 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "options.hpp" - -#include <proton/connection.hpp> -#include <proton/connection_options.hpp> -#include <proton/container.hpp> -#include <proton/default_container.hpp> -#include <proton/delivery.hpp> -#include <proton/link.hpp> -#include <proton/message.hpp> -#include <proton/message_id.hpp> -#include <proton/messaging_handler.hpp> -#include <proton/thread_safe.hpp> -#include <proton/value.hpp> - -#include <iostream> -#include <map> - -#include "fake_cpp11.hpp" - -class simple_recv : public proton::messaging_handler { - private: - std::string url; - std::string user; - std::string password; - proton::receiver receiver; - int expected; - int received; - - public: - simple_recv(const std::string &s, const std::string &u, const std::string &p, int c) : - url(s), user(u), password(p), expected(c), received(0) {} - - void on_container_start(proton::container &c) OVERRIDE { - proton::connection_options co; - if (!user.empty()) co.user(user); - if (!password.empty()) co.password(password); - receiver = c.open_receiver(url, co); - std::cout << "simple_recv listening on " << url << std::endl; - } - - void on_message(proton::delivery &d, proton::message &msg) OVERRIDE { - if (proton::coerce<int>(msg.id()) < received) { - return; // Ignore duplicate - } - - if (expected == 0 || received < expected) { - std::cout << msg.body() << std::endl; - received++; - - if (received == expected) { - d.receiver().close(); - d.connection().close(); - } - } - } -}; - -int main(int argc, char **argv) { - std::string address("127.0.0.1:5672/examples"); - std::string user; - std::string password; - int message_count = 100; - example::options opts(argc, argv); - - opts.add_value(address, 'a', "address", "connect to and receive from URL", "URL"); - opts.add_value(message_count, 'm', "messages", "receive COUNT messages", "COUNT"); - opts.add_value(user, 'u', "user", "authenticate as USER", "USER"); - opts.add_value(password, 'p', "password", "authenticate with PASSWORD", "PASSWORD"); - - - try { - opts.parse(); - - simple_recv recv(address, user, password, message_count); - proton::default_container(recv).run(); - - return 0; - } catch (const example::bad_option& e) { - std::cout << opts << std::endl << e.what() << std::endl; - } catch (const std::exception& e) { - std::cerr << e.what() << std::endl; - } - - return 1; -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/cpp/simple_send.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/simple_send.cpp b/examples/cpp/simple_send.cpp deleted file mode 100644 index 358bbec..0000000 --- a/examples/cpp/simple_send.cpp +++ /dev/null @@ -1,115 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "options.hpp" - -#include <proton/connection.hpp> -#include <proton/connection_options.hpp> -#include <proton/container.hpp> -#include <proton/default_container.hpp> -#include <proton/message.hpp> -#include <proton/message_id.hpp> -#include <proton/messaging_handler.hpp> -#include <proton/thread_safe.hpp> -#include <proton/tracker.hpp> -#include <proton/types.hpp> - -#include <iostream> -#include <map> - -#include "fake_cpp11.hpp" - -class simple_send : public proton::messaging_handler { - private: - std::string url; - std::string user; - std::string password; - proton::sender sender; - int sent; - int confirmed; - int total; - - public: - simple_send(const std::string &s, const std::string &u, const std::string &p, int c) : - url(s), user(u), password(p), sent(0), confirmed(0), total(c) {} - - void on_container_start(proton::container &c) OVERRIDE { - proton::connection_options co; - if (!user.empty()) co.user(user); - if (!password.empty()) co.password(password); - sender = c.open_sender(url, co); - } - - void on_sendable(proton::sender &s) OVERRIDE { - while (s.credit() && sent < total) { - proton::message msg; - std::map<std::string, int> m; - m["sequence"] = sent + 1; - - msg.id(sent + 1); - msg.body(m); - - s.send(msg); - sent++; - } - } - - void on_tracker_accept(proton::tracker &t) OVERRIDE { - confirmed++; - - if (confirmed == total) { - std::cout << "all messages confirmed" << std::endl; - t.connection().close(); - } - } - - void on_transport_close(proton::transport &) OVERRIDE { - sent = confirmed; - } -}; - -int main(int argc, char **argv) { - std::string address("127.0.0.1:5672/examples"); - std::string user; - std::string password; - int message_count = 100; - example::options opts(argc, argv); - - opts.add_value(address, 'a', "address", "connect and send to URL", "URL"); - opts.add_value(message_count, 'm', "messages", "send COUNT messages", "COUNT"); - opts.add_value(user, 'u', "user", "authenticate as USER", "USER"); - opts.add_value(password, 'p', "password", "authenticate with PASSWORD", "PASSWORD"); - - try { - opts.parse(); - - simple_send send(address, user, password, message_count); - proton::default_container(send).run(); - - return 0; - } catch (const example::bad_option& e) { - std::cout << opts << std::endl << e.what() << std::endl; - } catch (const std::exception& e) { - std::cerr << e.what() << std::endl; - } - - return 1; -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
