NO-JIRA: c++: broker example clean-up.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/c0917a0f Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/c0917a0f Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/c0917a0f Branch: refs/heads/go1 Commit: c0917a0fbc74eafa31a07f695c3ced00961cf184 Parents: 4d148e3 Author: Alan Conway <[email protected]> Authored: Wed Nov 4 12:02:51 2015 -0500 Committer: Alan Conway <[email protected]> Committed: Wed Nov 4 12:02:51 2015 -0500 ---------------------------------------------------------------------- examples/cpp/broker.hpp | 12 ----- examples/cpp/select_broker.cpp | 102 ++++++++++++++++++------------------ 2 files changed, 51 insertions(+), 63 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c0917a0f/examples/cpp/broker.hpp ---------------------------------------------------------------------- diff --git a/examples/cpp/broker.hpp b/examples/cpp/broker.hpp index 88d2e33..51dcaa3 100644 --- a/examples/cpp/broker.hpp +++ b/examples/cpp/broker.hpp @@ -36,14 +36,6 @@ #include <list> #include <sstream> -bool debug_enabled() { - const char *s = ::getenv("BROKER_DEBUG"); - return s && *s; // Set to non-empty string -} - -/// Debug log messages to stderr. -#define LOG_DEBUG(MSG) do { if (debug_enabled()) { std::cerr << MSG << std::endl; } } while(0) - /** A simple implementation of a queue. */ class queue { public: @@ -52,19 +44,16 @@ class queue { std::string name() const { return name_; } void subscribe(proton::sender &s) { - LOG_DEBUG("queue " << name_ << ": subscribe " << s.name()); consumers_.push_back(s.ptr()); } // Return true if queue can be deleted. bool unsubscribe(proton::sender &s) { - LOG_DEBUG("queue " << name_ << ": unsubscribe " << s.name()); consumers_.remove(s.ptr()); return (consumers_.size() == 0 && (dynamic_ || messages_.size() == 0)); } void publish(const proton::message &m, proton::receiver *r) { - LOG_DEBUG("queue " << name_ << ": receive from " << r->name() << " : " << m.body()); messages_.push_back(m); dispatch(0); } @@ -85,7 +74,6 @@ class queue { while (messages_.size()) { if (s->credit()) { const proton::message& m = messages_.front(); - LOG_DEBUG("queue " << name_ << ": send to " << s->name() << " : " << m.body()); s->send(m); messages_.pop_front(); result = true; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c0917a0f/examples/cpp/select_broker.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/select_broker.cpp b/examples/cpp/select_broker.cpp index a8a8f8f..ade47eb 100644 --- a/examples/cpp/select_broker.cpp +++ b/examples/cpp/select_broker.cpp @@ -22,20 +22,12 @@ #include "proton/engine.hpp" -#include <stdio.h> -#include <stdlib.h> -#include <string.h> - -#include <sys/types.h> -#include <sys/socket.h> -#include <netinet/in.h> +#include <sstream> #include <arpa/inet.h> -#include <netdb.h> - #include <sys/select.h> -#include <fcntl.h> +#include <sys/socket.h> +#include <sys/types.h> #include <unistd.h> -#include <err.h> #include <errno.h> template <class T> T check(T result, const std::string& msg=std::string()) { @@ -44,12 +36,9 @@ template <class T> T check(T result, const std::string& msg=std::string()) { return result; } -static void fd_set_if(bool on, int fd, fd_set *fds) { - if (on) - FD_SET(fd, fds); - else - FD_CLR(fd, fds); -} +void fd_set_if(bool on, int fd, fd_set *fds); +int do_listen(uint16_t port); +int do_accept(int listen_fd); class broker { @@ -59,7 +48,6 @@ class broker { broker_handler handler_; engine_map engines_; fd_set reading_, writing_; - int listen_; public: broker() : handler_(queues_) { @@ -74,17 +62,24 @@ class broker { void run(uint16_t port) { - listen(port); + int listen_fd = do_listen(port); + FD_SET(listen_fd, &reading_); while(true) { fd_set readable_set = reading_; fd_set writable_set = writing_; - check(::select(FD_SETSIZE, &readable_set, &writable_set, NULL, NULL), "select"); + check(select(FD_SETSIZE, &readable_set, &writable_set, NULL, NULL), "select"); for (int fd = 0; fd < FD_SETSIZE; ++fd) { - if (fd == listen_ && FD_ISSET(fd, &readable_set)) - accept(); - + if (fd == listen_fd) { + if (FD_ISSET(listen_fd, &readable_set)) { + int new_fd = do_accept(listen_fd); + engines_[new_fd] = new proton::engine(handler_); + FD_SET(new_fd, &reading_); + FD_SET(new_fd, &writing_); + } + continue; + } if (engines_.find(fd) != engines_.end()) { proton::engine& eng = *engines_[fd]; try { @@ -114,36 +109,10 @@ class broker { private: - void listen(uint16_t port) { - listen_ = check(::socket(PF_INET, SOCK_STREAM, 0), "create listener"); - int yes = 1; - check(::setsockopt(listen_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)), "setsockopt"); - struct sockaddr_in name; - name.sin_family = AF_INET; - name.sin_port = htons (port); - name.sin_addr.s_addr = htonl (INADDR_ANY); - check(::bind(listen_, (struct sockaddr *)&name, sizeof(name)), "bind listener"); - check(::listen(listen_, 32), "listen"); - std::cout << "listening on port " << port << " fd=" << listen_ << std::endl; - FD_SET(listen_, &reading_); - } - - void accept() { - struct sockaddr_in client_addr; - socklen_t size = sizeof(client_addr); - int fd = check(::accept(listen_, (struct sockaddr *)&client_addr, &size), "accept"); - engines_[fd] = new proton::engine(handler_); - FD_SET(fd, &reading_); - FD_SET(fd, &writing_); - std::cout << "accept " << ::inet_ntoa(client_addr.sin_addr) - << ":" << ntohs(client_addr.sin_port) - << " fd=" << fd << std::endl; - } - void readable(int fd, proton::engine& eng) { proton::buffer<char> input = eng.input(); if (input.size()) { - ssize_t n = check(::read(fd, input.begin(), input.size())); + ssize_t n = check(read(fd, input.begin(), input.size())); if (n > 0) { eng.received(n); } else { @@ -155,7 +124,7 @@ class broker { void writable(int fd, proton::engine& eng) { proton::buffer<const char> output = eng.output(); if (output.size()) { - ssize_t n = check(::write(fd, output.begin(), output.size())); + ssize_t n = check(write(fd, output.begin(), output.size())); if (n > 0) eng.sent(n); else { @@ -166,6 +135,37 @@ class broker { }; +void fd_set_if(bool on, int fd, fd_set *fds) { + if (on) + FD_SET(fd, fds); + else + FD_CLR(fd, fds); +} + +int do_listen(uint16_t port) { + int listen_fd = check(socket(PF_INET, SOCK_STREAM, 0), "create listener"); + int yes = 1; + check(setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)), "setsockopt"); + struct sockaddr_in name; + name.sin_family = AF_INET; + name.sin_port = htons (port); + name.sin_addr.s_addr = htonl (INADDR_ANY); + check(bind(listen_fd, (struct sockaddr *)&name, sizeof(name)), "bind listener"); + check(listen(listen_fd, 32), "listen"); + std::cout << "listening on port " << port << " fd=" << listen_fd << std::endl; + return listen_fd; +} + +int do_accept(int listen_fd) { + struct sockaddr_in client_addr; + socklen_t size = sizeof(client_addr); + int fd = check(accept(listen_fd, (struct sockaddr *)&client_addr, &size), "accept"); + std::cout << "accept " << ::inet_ntoa(client_addr.sin_addr) + << ":" << ntohs(client_addr.sin_port) + << " fd=" << fd << std::endl; + return fd; +} + int main(int argc, char **argv) { // Command line options proton::url url("0.0.0.0"); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
