PROTON-1036: c++: engine API for integration with external IO frameworks proton::engine wraps the pn transport, connection and collector objects and provides a simple bytes-in/bytes-out interface that can be used to integrate proton with any IO framework.
- added proton::engine - added proton::event_loop base class for proton::engine and proton::container. - extracted broker.hpp: common code for example brokers (queue and handler) - added select_broker.cpp example - added url::port_int() to get the integer value of a URL. Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/193a7dd5 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/193a7dd5 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/193a7dd5 Branch: refs/heads/go1 Commit: 193a7dd5d896ce62143b4a52179de067ae491abb Parents: 4f5e18a Author: Alan Conway <[email protected]> Authored: Mon Nov 2 17:56:03 2015 -0500 Committer: Alan Conway <[email protected]> Committed: Mon Nov 2 18:14:08 2015 -0500 ---------------------------------------------------------------------- examples/cpp/CMakeLists.txt | 6 + examples/cpp/broker.cpp | 171 ++------------- examples/cpp/broker.hpp | 213 +++++++++++++++++++ examples/cpp/example_test.py | 3 +- examples/cpp/select_broker.cpp | 186 ++++++++++++++++ proton-c/bindings/cpp/CMakeLists.txt | 1 + proton-c/bindings/cpp/README.md | 2 - proton-c/bindings/cpp/docs/mainpage.md | 6 + .../bindings/cpp/include/proton/connection.hpp | 21 +- .../bindings/cpp/include/proton/container.hpp | 4 +- proton-c/bindings/cpp/include/proton/engine.hpp | 155 ++++++++++++++ proton-c/bindings/cpp/include/proton/event.hpp | 9 +- .../bindings/cpp/include/proton/event_loop.hpp | 41 ++++ proton-c/bindings/cpp/include/proton/types.hpp | 3 - proton-c/bindings/cpp/include/proton/url.hpp | 4 + proton-c/bindings/cpp/src/connection.cpp | 9 +- proton-c/bindings/cpp/src/connector.cpp | 2 +- proton-c/bindings/cpp/src/container_impl.cpp | 2 +- proton-c/bindings/cpp/src/engine.cpp | 132 ++++++++++++ proton-c/bindings/cpp/src/event.cpp | 9 + proton-c/bindings/cpp/src/messaging_event.cpp | 6 +- proton-c/bindings/cpp/src/messaging_event.hpp | 2 +- proton-c/bindings/cpp/src/proton_event.cpp | 25 ++- proton-c/bindings/cpp/src/proton_event.hpp | 109 +++++----- proton-c/bindings/cpp/src/url.cpp | 15 +- 25 files changed, 901 insertions(+), 235 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/examples/cpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/examples/cpp/CMakeLists.txt b/examples/cpp/CMakeLists.txt index 34edb83..d5925b8 100644 --- a/examples/cpp/CMakeLists.txt +++ b/examples/cpp/CMakeLists.txt @@ -32,6 +32,7 @@ foreach(example direct_send sync_client client + select_broker server server_direct recurring_timer @@ -54,3 +55,8 @@ endif(WIN32) add_test(NAME cpp_example_test COMMAND ${PYTHON_EXECUTABLE} ${env_py} -- "PATH=${test_path}" "PYTHONPATH=${CMAKE_CURRENT_SOURCE_DIR}" ${VALGRIND_ENV} ${PYTHON_EXECUTABLE} -m unittest -v example_test) + +set(broker_tests example_test.ExampleTest.test_request_response example_test.ExampleTest.test_simple_send_recv) + +add_test(NAME cpp_example_select_test + COMMAND ${PYTHON_EXECUTABLE} ${env_py} -- "PATH=${test_path}" "PYTHONPATH=${CMAKE_CURRENT_SOURCE_DIR}" "TEST_BROKER=select_broker" ${VALGRIND_ENV} ${PYTHON_EXECUTABLE} -m unittest -v ${broker_tests}) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/examples/cpp/broker.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/broker.cpp b/examples/cpp/broker.cpp index 080bc9c..a98d43d 100644 --- a/examples/cpp/broker.cpp +++ b/examples/cpp/broker.cpp @@ -20,177 +20,42 @@ */ #include "options.hpp" +#include "broker.hpp" #include "proton/acceptor.hpp" #include "proton/container.hpp" -#include "proton/messaging_handler.hpp" -#include "proton/url.hpp" #include "proton/value.hpp" #include <iostream> -#include <sstream> #include <deque> #include <map> #include <list> #include <string> -class queue { +class broker { public: - bool dynamic; - typedef std::deque<proton::message> message_queue; - typedef std::list<proton::counted_ptr<proton::sender> > sender_list; - message_queue messages; - sender_list consumers; + broker(const proton::url& url) : handler_(url, queues_) {} - queue(bool dyn = false) : dynamic(dyn) {} + proton::messaging_handler& handler() { return handler_; } - void subscribe(proton::sender &c) { - consumers.push_back(c.ptr()); - } - - bool unsubscribe(proton::sender &c) { - consumers.remove(c.ptr()); - return (consumers.size() == 0 && (dynamic || messages.size() == 0)); - } - - void publish(const proton::message &m) { - messages.push_back(m); - dispatch(0); - } - - void dispatch(proton::sender *s) { - while (deliver_to(s)) {} - } - - bool deliver_to(proton::sender *consumer) { - // deliver to single consumer if supplied, else all consumers - int count = consumer ? 1 : consumers.size(); - if (!count) return false; - bool result = false; - sender_list::iterator it = consumers.begin(); - if (!consumer && count) - consumer = it->get(); - - while (messages.size()) { - if (consumer->credit()) { - consumer->send(messages.front()); - messages.pop_front(); - result = true; - } - if (--count) - it++; - else - return result; - } - return false; - } -}; - -class broker : public proton::messaging_handler { private: - typedef std::map<std::string, queue *> queue_map; - proton::url url; - queue_map queues; - uint64_t queue_count; // Use to generate unique queue IDs. - - public: - - broker(const proton::url &u) : url(u), queue_count(0) {} - void on_start(proton::event &e) { - e.container().listen(url); - std::cout << "broker listening on " << url << std::endl; - } + class my_handler : public broker_handler { + public: + my_handler(const proton::url& u, queues& qs) : broker_handler(qs), url_(u) {} - class queue &get_queue(std::string &address) { - queue_map::iterator it = queues.find(address); - if (it == queues.end()) { - queues[address] = new queue(); - return *queues[address]; - } - else { - return *it->second; + void on_start(proton::event &e) { + e.container().listen(url_); + std::cout << "broker listening on " << url_ << std::endl; } - } - std::string queue_name() { - std::ostringstream os; - os << "q" << queue_count++; - return os.str(); - } + private: + const proton::url& url_; + }; - void on_link_opening(proton::event &e) { - proton::link& lnk = e.link(); - if (lnk.sender()) { - proton::terminus &remote_source(lnk.remote_source()); - if (remote_source.dynamic()) { - std::string address = queue_name(); - lnk.source().address(address); - queue *q = new queue(true); - queues[address] = q; - q->subscribe(*lnk.sender()); - std::cout << "broker dynamic outgoing link from " << address << std::endl; - } - else { - std::string address = remote_source.address(); - if (!address.empty()) { - lnk.source().address(address); - get_queue(address).subscribe(*lnk.sender()); - std::cout << "broker outgoing link from " << address << std::endl; - } - } - } - else { - std::string address = lnk.remote_target().address(); - if (!address.empty()) - lnk.target().address(address); - std::cout << "broker incoming link to " << address << std::endl; - } - } - - void unsubscribe (proton::sender &lnk) { - std::string address = lnk.source().address(); - queue_map::iterator it = queues.find(address); - if (it != queues.end() && it->second->unsubscribe(lnk)) { - delete it->second; - queues.erase(it); - } - } - - void on_link_closing(proton::event &e) { - proton::link &lnk = e.link(); - if (lnk.sender()) { - unsubscribe(*lnk.sender()); - } - } - - void on_connection_closing(proton::event &e) { - remove_stale_consumers(e.connection()); - } - - void on_disconnected(proton::event &e) { - remove_stale_consumers(e.connection()); - } - - void remove_stale_consumers(proton::connection &connection) { - proton::link_range r = connection.find_links(proton::endpoint::REMOTE_ACTIVE); - for (proton::link_iterator l = r.begin(); l != r.end(); ++l) { - if (l->sender()) { - unsubscribe(*l->sender()); - } - } - } - - void on_sendable(proton::event &e) { - proton::link& lnk = e.link(); - std::string addr = lnk.source().address(); - get_queue(addr).dispatch(lnk.sender()); - } - - void on_message(proton::event &e) { - std::string addr = e.link().target().address(); - get_queue(addr).publish(e.message()); - } + private: + queues queues_; + my_handler handler_; }; int main(int argc, char **argv) { @@ -200,8 +65,8 @@ int main(int argc, char **argv) { opts.add_value(url, 'a', "address", "listen on URL", "URL"); try { opts.parse(); - broker broker(url); - proton::container(broker).run(); + broker b(url); + proton::container(b.handler()).run(); return 0; } catch (const bad_option& e) { std::cout << opts << std::endl << e.what() << std::endl; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/examples/cpp/broker.hpp ---------------------------------------------------------------------- diff --git a/examples/cpp/broker.hpp b/examples/cpp/broker.hpp new file mode 100644 index 0000000..88d2e33 --- /dev/null +++ b/examples/cpp/broker.hpp @@ -0,0 +1,213 @@ +#ifndef BROKER_HPP +#define BROKER_HPP +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/**@file + * + * Common code used by different broker examples. + * + * The examples add functionality as needed, this helps to make it easier to see + * the important differences between the examples. + */ + +#include "proton/messaging_handler.hpp" +#include "proton/url.hpp" + +#include <iostream> +#include <deque> +#include <map> +#include <list> +#include <sstream> + +bool debug_enabled() { + const char *s = ::getenv("BROKER_DEBUG"); + return s && *s; // Set to non-empty string +} + +/// Debug log messages to stderr. +#define LOG_DEBUG(MSG) do { if (debug_enabled()) { std::cerr << MSG << std::endl; } } while(0) + +/** A simple implementation of a queue. */ +class queue { + public: + queue(const std::string &name, bool dynamic = false) : name_(name), dynamic_(dynamic) {} + + std::string name() const { return name_; } + + void subscribe(proton::sender &s) { + LOG_DEBUG("queue " << name_ << ": subscribe " << s.name()); + consumers_.push_back(s.ptr()); + } + + // Return true if queue can be deleted. + bool unsubscribe(proton::sender &s) { + LOG_DEBUG("queue " << name_ << ": unsubscribe " << s.name()); + consumers_.remove(s.ptr()); + return (consumers_.size() == 0 && (dynamic_ || messages_.size() == 0)); + } + + void publish(const proton::message &m, proton::receiver *r) { + LOG_DEBUG("queue " << name_ << ": receive from " << r->name() << " : " << m.body()); + messages_.push_back(m); + dispatch(0); + } + + void dispatch(proton::sender *s) { + while (deliver_to(s)) {} + } + + bool deliver_to(proton::sender *s) { + // deliver to single sender if supplied, else all consumers + int count = s ? 1 : consumers_.size(); + if (!count) return false; + bool result = false; + sender_list::iterator it = consumers_.begin(); + if (!s && count) + s = it->get(); + + while (messages_.size()) { + if (s->credit()) { + const proton::message& m = messages_.front(); + LOG_DEBUG("queue " << name_ << ": send to " << s->name() << " : " << m.body()); + s->send(m); + messages_.pop_front(); + result = true; + } + if (--count) + it++; + else + return result; + } + return false; + } + + private: + typedef std::deque<proton::message> message_queue; + typedef std::list<proton::counted_ptr<proton::sender> > sender_list; + + std::string name_; + bool dynamic_; + message_queue messages_; + sender_list consumers_; +}; + +/** A collection of queues and queue factory, used by a broker */ +class queues { + public: + queues() : next_id_(0) {} + + // Get or create a queue. + virtual queue &get(const std::string &address = std::string()) { + if (address.empty()) throw std::runtime_error("empty queue name"); + queue*& q = queues_[address]; + if (!q) q = new queue(address); + return *q; + } + + // Create a dynamic queue with a unique name. + virtual queue &dynamic() { + std::ostringstream os; + os << "q" << next_id_++; + queue *q = queues_[os.str()] = new queue(os.str(), true); + return *q; + } + + // Delete the named queue + virtual void erase(std::string &name) { + delete queues_[name]; + queues_.erase(name); + } + + protected: + typedef std::map<std::string, queue *> queue_map; + queue_map queues_; + uint64_t next_id_; // Use to generate unique queue IDs. +}; + + +/** Common handler logic for brokers. */ +class broker_handler : public proton::messaging_handler { + public: + broker_handler(queues& qs) : queues_(qs) {} + + void on_link_opening(proton::event &e) { + proton::link& lnk = e.link(); + if (lnk.sender()) { + proton::terminus &remote_source(lnk.remote_source()); + queue &q = remote_source.dynamic() ? + queues_.dynamic() : queues_.get(remote_source.address()); + lnk.source().address(q.name()); + q.subscribe(*lnk.sender()); + std::cout << "broker outgoing link from " << q.name() << std::endl; + } + else { // Receiver + std::string address = lnk.remote_target().address(); + if (!address.empty()) { + lnk.target().address(address); + std::cout << "broker incoming link to " << address << std::endl; + } + } + } + + void unsubscribe (proton::sender &lnk) { + std::string address = lnk.source().address(); + if (queues_.get(address).unsubscribe(lnk)) + queues_.erase(address); + } + + void on_link_closing(proton::event &e) { + proton::link &lnk = e.link(); + if (lnk.sender()) + unsubscribe(*lnk.sender()); + } + + void on_connection_closing(proton::event &e) { + remove_stale_consumers(e.connection()); + } + + void on_disconnected(proton::event &e) { + remove_stale_consumers(e.connection()); + } + + void remove_stale_consumers(proton::connection &connection) { + proton::link_range r = connection.find_links(proton::endpoint::REMOTE_ACTIVE); + for (proton::link_iterator l = r.begin(); l != r.end(); ++l) { + if (l->sender()) + unsubscribe(*l->sender()); + } + } + + void on_sendable(proton::event &e) { + proton::link& lnk = e.link(); + std::string address = lnk.source().address(); + queues_.get(address).dispatch(lnk.sender()); + } + + void on_message(proton::event &e) { + std::string address = e.link().target().address(); + queues_.get(address).publish(e.message(), e.link().receiver()); + } + + protected: + queues& queues_; +}; + + +#endif // BROKER_HPP http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/examples/cpp/example_test.py ---------------------------------------------------------------------- diff --git a/examples/cpp/example_test.py b/examples/cpp/example_test.py index d2b3169..3ed8b9c 100644 --- a/examples/cpp/example_test.py +++ b/examples/cpp/example_test.py @@ -97,8 +97,9 @@ class Broker(object): cls._broker = None def __init__(self): + broker_exe = os.environ.get("TEST_BROKER") or "broker" self.addr = pick_addr() - cmd = cmdline("broker", "-a", self.addr) + cmd = cmdline(broker_exe, "-a", self.addr) try: self.process = Popen(cmd, stdout=NULL, stderr=sys.stderr) wait_addr(self.addr) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/examples/cpp/select_broker.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/select_broker.cpp b/examples/cpp/select_broker.cpp new file mode 100644 index 0000000..49cc2dc --- /dev/null +++ b/examples/cpp/select_broker.cpp @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "options.hpp" +#include "broker.hpp" + +#include "proton/engine.hpp" + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#include <sys/types.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <netdb.h> + +#include <sys/select.h> +#include <fcntl.h> +#include <unistd.h> +#include <err.h> +#include <errno.h> + +template <class T> T check(T result, const std::string& msg=std::string()) { + // Note strerror is thread unsafe, this example is single-threaded. + if (result < 0) throw std::runtime_error(msg + ": " + strerror(errno)); + return result; +} + +static void fd_set_if(bool on, int fd, fd_set *fds) { + if (on) + FD_SET(fd, fds); + else + FD_CLR(fd, fds); +} + +class broker { + + typedef std::map<int, proton::engine*> engine_map; + + queues queues_; + broker_handler handler_; + engine_map engines_; + fd_set reading_, writing_; + int listen_; + + public: + broker() : handler_(queues_) { + FD_ZERO(&reading_); + FD_ZERO(&writing_); + } + + ~broker() { + for (engine_map::iterator i = engines_.begin(); i != engines_.end(); ++i) + delete i->second; + } + + void run(uint16_t port) { + + listen(port); + + while(true) { + fd_set readable_set = reading_; + fd_set writable_set = writing_; + + check(::select(FD_SETSIZE, &readable_set, &writable_set, NULL, NULL), "select"); + for (int fd = 0; fd < FD_SETSIZE; ++fd) { + if (fd == listen_ && FD_ISSET(fd, &readable_set)) + accept(); + + if (engines_.find(fd) != engines_.end()) { + proton::engine& eng = *engines_[fd]; + try { + if (FD_ISSET(fd, &readable_set)) + readable(fd, eng); + + if (FD_ISSET(fd, &writable_set)) + writable(fd, eng); + } catch (const std::exception& e) { + std::cout << e.what() << " fd=" << fd << std::endl; + eng.close_input(); + eng.close_output(); + } + // Set reading/writing bits for next time around + fd_set_if(eng.input().size(), fd, &reading_); + fd_set_if(eng.output().size(), fd, &writing_); + + if (eng.closed()) { + ::close(fd); + delete engines_[fd]; + engines_.erase(fd); + } + } + } + } + } + + private: + + void listen(uint16_t port) { + listen_ = check(::socket(PF_INET, SOCK_STREAM, 0), "create listener"); + int yes = 1; + check(::setsockopt(listen_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(int)), "setsockopt"); + struct sockaddr_in name; + name.sin_family = AF_INET; + name.sin_port = htons (port); + name.sin_addr.s_addr = htonl (INADDR_ANY); + check(::bind(listen_, (struct sockaddr *)&name, sizeof(name)), "bind listener"); + check(::listen(listen_, 32), "listen"); + std::cout << "listening on port " << port << " fd=" << listen_ << std::endl; + FD_SET(listen_, &reading_); + } + + void accept() { + struct sockaddr_in client_addr; + socklen_t size = sizeof(client_addr); + int fd = check(::accept(listen_, (struct sockaddr *)&client_addr, &size), "accept"); + engines_[fd] = new proton::engine(handler_); + FD_SET(fd, &reading_); + FD_SET(fd, &writing_); + std::cout << "accept " << ::inet_ntoa(client_addr.sin_addr) + << ":" << ::ntohs(client_addr.sin_port) + << " fd=" << fd << std::endl; + } + + void readable(int fd, proton::engine& eng) { + proton::buffer<char> input = eng.input(); + if (input.size()) { + ssize_t n = check(::read(fd, input.begin(), input.size())); + if (n > 0) { + eng.received(n); + } else { + eng.close_input(); + } + } + } + + void writable(int fd, proton::engine& eng) { + proton::buffer<const char> output = eng.output(); + if (output.size()) { + ssize_t n = check(::write(fd, output.begin(), output.size())); + if (n > 0) + eng.sent(n); + else { + eng.close_output(); + } + } + } + +}; + +int main(int argc, char **argv) { + // Command line options + proton::url url("0.0.0.0"); + options opts(argc, argv); + opts.add_value(url, 'a', "address", "listen on URL", "URL"); + try { + opts.parse(); + broker().run(url.port_int()); + return 0; + } catch (const bad_option& e) { + std::cout << opts << std::endl << e.what() << std::endl; + } catch (const std::exception& e) { + std::cerr << e.what() << std::endl; + } + return 1; +} + + http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/proton-c/bindings/cpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/CMakeLists.txt b/proton-c/bindings/cpp/CMakeLists.txt index 0c44fb9..3995c20 100644 --- a/proton-c/bindings/cpp/CMakeLists.txt +++ b/proton-c/bindings/cpp/CMakeLists.txt @@ -43,6 +43,7 @@ set(qpid-proton-cpp-source src/duration.cpp src/encoder.cpp src/endpoint.cpp + src/engine.cpp src/error.cpp src/event.cpp src/facade.cpp http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/proton-c/bindings/cpp/README.md ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/README.md b/proton-c/bindings/cpp/README.md index d84813b..c89534e 100644 --- a/proton-c/bindings/cpp/README.md +++ b/proton-c/bindings/cpp/README.md @@ -17,8 +17,6 @@ Tests Bugs - Error handling: - examples exit silently on broker exit/not running, core on no-such-queue (e.g. with qpidd) - - TODO/FIXME notes in code. -- const correctness: consistent use of const where semantically appropriate in C++ APIs. Features - SASL/SSL support with interop tests. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/proton-c/bindings/cpp/docs/mainpage.md ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/docs/mainpage.md b/proton-c/bindings/cpp/docs/mainpage.md index b370b13..9129307 100644 --- a/proton-c/bindings/cpp/docs/mainpage.md +++ b/proton-c/bindings/cpp/docs/mainpage.md @@ -98,6 +98,12 @@ complex AMQP types. For details on converting between AMQP and C++ data types see the \ref encode_decode.cpp example and classes `proton::encoder`, `proton::decoder` and `proton::value`. +`proton::engine` provides fewer facilities that `proton::container` but makes +fewer (no) assumptions about application threading and IO. It may be easier to +integrate with an existing IO framework, for multi-threaded applications or for +non-socket IO. See the \ref select_broker.cpp example. The main application and +AMQP logic is implemented the same way using the engine or the container. + Delivery Guarantees ------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/proton-c/bindings/cpp/include/proton/connection.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/connection.hpp b/proton-c/bindings/cpp/include/proton/connection.hpp index cacf004..68d46a3 100644 --- a/proton-c/bindings/cpp/include/proton/connection.hpp +++ b/proton-c/bindings/cpp/include/proton/connection.hpp @@ -32,17 +32,28 @@ struct pn_connection_t; namespace proton { class handler; -class transport; +class engine; /** connection to a remote AMQP peer. */ class connection : public counted_facade<pn_connection_t, connection, endpoint> { public: - ///@name getters @{ - PN_CPP_EXTERN class transport& transport() const; - PN_CPP_EXTERN class container& container() const; + + /// Get the event_loop, can be a container or an engine. + PN_CPP_EXTERN class event_loop &event_loop() const; + + /// Get the container, throw an exception if event_loop is not a container. + PN_CPP_EXTERN class container &container() const; + + /// Get the engine, , throw an exception if event_loop is not an engine. + PN_CPP_EXTERN class engine &engine() const; + + /// Return the AMQP host name for the connection. PN_CPP_EXTERN std::string host() const; - ///@} + + /// Return the container-ID for the connection. All connections have a container_id, + /// even if they don't have a container event_loop. + PN_CPP_EXTERN std::string container_id() const; /** Initiate local open, not complete till messaging_handler::on_connection_opened() * or proton_handler::on_connection_remote_open() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/proton-c/bindings/cpp/include/proton/container.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/container.hpp b/proton-c/bindings/cpp/include/proton/container.hpp index 059416f..de91505 100644 --- a/proton-c/bindings/cpp/include/proton/container.hpp +++ b/proton-c/bindings/cpp/include/proton/container.hpp @@ -23,6 +23,7 @@ */ #include "proton/duration.hpp" #include "proton/export.hpp" +#include "proton/event_loop.hpp" #include "proton/pn_unique_ptr.hpp" #include "proton/reactor.hpp" #include "proton/url.hpp" @@ -47,8 +48,7 @@ class container_impl; * Note that by default, links belonging to the container have generated link-names * of the form */ -class container -{ +class container : public event_loop { public: /// Container ID should be unique within your system. By default a random ID is generated. PN_CPP_EXTERN container(const std::string& id=std::string()); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/proton-c/bindings/cpp/include/proton/engine.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/engine.hpp b/proton-c/bindings/cpp/include/proton/engine.hpp new file mode 100644 index 0000000..672e911 --- /dev/null +++ b/proton-c/bindings/cpp/include/proton/engine.hpp @@ -0,0 +1,155 @@ +#ifndef ENGINE_HPP +#define ENGINE_HPP +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "proton/pn_unique_ptr.hpp" +#include "proton/export.hpp" +#include "proton/event_loop.hpp" + +#include <cstddef> +#include <utility> + +namespace proton { + +class handler; +class connection; + +/// Pointers to a data buffer. +template <class T> class buffer { + public: + explicit buffer(T* begin__=0, T* end__=0) : begin_(begin__), end_(end__) {} + explicit buffer(T* ptr, size_t n) : begin_(ptr), end_(ptr + n) {} + T* begin() const { return begin_; } + T* end() const { return end_; } + size_t size() const { return end() - begin(); } + bool empty() const { return !size(); } + + private: + T* begin_; + T* end_; +}; + +/** + * An engine is an event_loop that manages a single AMQP connection. It is + * useful for integrating AMQP into an existing IO framework. + * + * The engine provides a simple "bytes-in/bytes-out" interface. Incoming AMQP + * bytes from any kind of data connection are fed into the engine and processed + * to dispatch events to a proton::handler. The resulting AMQP output data is + * available from the engine and can sent back over the connection. + * + * The engine does no IO of its own. It assumes a two-way flow of bytes over + * some externally-managed "connection". The "connection" could be a socket + * managed by select, poll, epoll or some other mechanism, or it could be + * something else such as an RDMA connection, a shared-memory buffer or a Unix + * pipe. + * + * The engine is an alternative event_loop to the proton::container. The + * container is easier to use in single-threaded, stand-alone applications that + * want to use standard socket connections. The engine can be embedding into + * any existing IO framework for any type of IO. + * + * The application is coded the same way for engine or container: you implement + * proton::handler. Handlers attached to an engine will receive transport, + * connection, session, link and message events. They will not receive reactor, + * selectable or timer events, the engine assumes those are managed externally. + * + * THREAD SAFETY: A single engine instance cannot be called concurrently, but + * different engine instances can be processed concurrently in separate threads. + */ +class engine : public event_loop { + public: + + // TODO aconway 2015-11-02: engine() take connection-options. + // TODO aconway 2015-11-02: engine needs to accept application events. + // TODO aconway 2015-11-02: generalize reconnect logic for container and engine. + + /** + * Create an engine that will advertise id as the AMQP container-id for its connection. + */ + PN_CPP_EXTERN engine(handler&, const std::string& id=std::string()); + + PN_CPP_EXTERN ~engine(); + + /** + * Input buffer. If input.size() == 0 means no input can be accepted right now, but + * sending output might free up space. + */ + PN_CPP_EXTERN buffer<char> input(); + + /** + * Process n bytes from input(). Calls handler functions for the AMQP events + * encoded by the received data. + * + * After the call the input() and output() buffers may have changed. + */ + PN_CPP_EXTERN void received(size_t n); + + /** + * Indicate that no more input data is available from the external + * connection. May call handler functions. + */ + PN_CPP_EXTERN void close_input(); + + /** + * Output buffer. Send data from this buffer and call sent() to indicate it + * has been sent. If output().size() == 0 there is no data to send, + * receiving more input data might make output data available. + */ + PN_CPP_EXTERN buffer<const char> output(); + + /** + * Call when the first n bytes from the start of the output buffer have been + * sent. May call handler functions. + * + * After the call the input() and output() buffers may have changed. + */ + PN_CPP_EXTERN void sent(size_t n); + + /** + * Indicate that no more output data can be sent to the external connection. + * May call handler functions. + */ + PN_CPP_EXTERN void close_output(); + + /** + * True if engine is closed. This can either be because close_input() and + * close_output() were called to indicate the external connection has + * closed, or because AMQP close frames were received in the AMQP data. In + * either case no more input() buffer space or output() data will be + * available for this engine. + */ + PN_CPP_EXTERN bool closed() const; + + /** The AMQP connection associated with this engine. */ + PN_CPP_EXTERN class connection& connection() const; + + /** The AMQP container-id associated with this engine. */ + PN_CPP_EXTERN std::string id() const; + + private: + void run(); + + struct impl; + pn_unique_ptr<impl> impl_; +}; + +} +#endif // ENGINE_HPP http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/proton-c/bindings/cpp/include/proton/event.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/event.hpp b/proton-c/bindings/cpp/include/proton/event.hpp index 6fb0c94..ed310f1 100644 --- a/proton-c/bindings/cpp/include/proton/event.hpp +++ b/proton-c/bindings/cpp/include/proton/event.hpp @@ -45,8 +45,15 @@ class event { /// Return the name of the event type virtual PN_CPP_EXTERN std::string name() const = 0; - /// Get container. + /// Get the event_loop object, can be a container or an engine. + virtual PN_CPP_EXTERN class event_loop &event_loop() const; + + /// Get the container, throw an exception if event_loop is not a container. virtual PN_CPP_EXTERN class container &container() const; + + /// Get the engine, , throw an exception if event_loop is not an engine. + virtual PN_CPP_EXTERN class engine &engine() const; + /// Get connection. virtual PN_CPP_EXTERN class connection &connection() const; /// Get sender @throws error if no sender. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/proton-c/bindings/cpp/include/proton/event_loop.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/event_loop.hpp b/proton-c/bindings/cpp/include/proton/event_loop.hpp new file mode 100644 index 0000000..e5dff37 --- /dev/null +++ b/proton-c/bindings/cpp/include/proton/event_loop.hpp @@ -0,0 +1,41 @@ +#ifndef EVENT_LOOP_HPP +#define EVENT_LOOP_HPP +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "proton/export.hpp" + +#include <string> + +/** + * An event_loop dispatches events to event handlers. event_loop is an abstract + * class, concrete subclasses are proton::container and prton::engine. +*/ +class event_loop { + public: + PN_CPP_EXTERN virtual ~event_loop() {} + + /// The AMQP container-id associated with this event loop. + /// Any connections managed by this event loop will have this container id. + PN_CPP_EXTERN virtual std::string id() const = 0; + + // TODO aconway 2015-11-02: injecting application events. +}; + +#endif // EVENT_LOOP_HPP http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/proton-c/bindings/cpp/include/proton/types.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/types.hpp b/proton-c/bindings/cpp/include/proton/types.hpp index b1e7085..0f82ae3 100644 --- a/proton-c/bindings/cpp/include/proton/types.hpp +++ b/proton-c/bindings/cpp/include/proton/types.hpp @@ -121,9 +121,6 @@ struct amqp_binary : public std::string { operator pn_bytes_t() const { return pn_bytes(*this); } }; -// TODO aconway 2015-06-11: alternative representation of variable-length data -// as pointer to existing buffer. - /// Template for opaque proton proton types that can be treated as byte arrays. template <class P> struct opaque: public comparable<opaque<P> > { P value; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/proton-c/bindings/cpp/include/proton/url.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/url.hpp b/proton-c/bindings/cpp/include/proton/url.hpp index 42abadd..e8c68aa 100644 --- a/proton-c/bindings/cpp/include/proton/url.hpp +++ b/proton-c/bindings/cpp/include/proton/url.hpp @@ -20,6 +20,7 @@ */ #include "proton/facade.hpp" +#include "proton/types.hpp" #include "proton/error.hpp" #include <iosfwd> @@ -89,6 +90,9 @@ class url { PN_CPP_EXTERN std::string host() const; /** port is a string, it can be a number or a symbolic name like "amqp" */ PN_CPP_EXTERN std::string port() const; + /** port_int is the numeric value of the port. */ + PN_CPP_EXTERN uint16_t port_int() const; + /** path is everything after the final "/" */ PN_CPP_EXTERN std::string path() const; //@} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/proton-c/bindings/cpp/src/connection.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/connection.cpp b/proton-c/bindings/cpp/src/connection.cpp index 1b35e03..2d401bd 100644 --- a/proton-c/bindings/cpp/src/connection.cpp +++ b/proton-c/bindings/cpp/src/connection.cpp @@ -37,10 +37,6 @@ namespace proton { -transport &connection::transport() const { - return *transport::cast(pn_connection_transport(pn_cast(this))); -} - void connection::open() { pn_connection_open(pn_cast(this)); } void connection::close() { pn_connection_close(pn_cast(this)); } @@ -49,6 +45,11 @@ std::string connection::host() const { return std::string(pn_connection_get_hostname(pn_cast(this))); } +std::string connection::container_id() const { + const char* id = pn_connection_get_container(pn_cast(this)); + return id ? std::string(id) : std::string(); +} + container& connection::container() const { return container_context(pn_object_reactor(pn_cast(this))); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/proton-c/bindings/cpp/src/connector.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/connector.cpp b/proton-c/bindings/cpp/src/connector.cpp index 49660ea..3333441 100644 --- a/proton-c/bindings/cpp/src/connector.cpp +++ b/proton-c/bindings/cpp/src/connector.cpp @@ -40,7 +40,7 @@ void connector::address(const url &a) { void connector::connect() { pn_connection_t *conn = pn_cast(connection_); - pn_connection_set_container(conn, connection_->container().id().c_str()); + pn_connection_set_container(conn, connection_->container_id().c_str()); pn_connection_set_hostname(conn, address_.host_port().c_str()); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/proton-c/bindings/cpp/src/container_impl.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/container_impl.cpp b/proton-c/bindings/cpp/src/container_impl.cpp index fc97a3e..6dfd797 100644 --- a/proton-c/bindings/cpp/src/container_impl.cpp +++ b/proton-c/bindings/cpp/src/container_impl.cpp @@ -63,7 +63,7 @@ struct handler_context { static void dispatch(pn_handler_t *c_handler, pn_event_t *c_event, pn_event_type_t type) { handler_context& hc(handler_context::get(c_handler)); - messaging_event mevent(c_event, type, *hc.container_); + messaging_event mevent(c_event, type, hc.container_); mevent.dispatch(*hc.handler_); return; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/proton-c/bindings/cpp/src/engine.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/engine.cpp b/proton-c/bindings/cpp/src/engine.cpp new file mode 100644 index 0000000..61d4040 --- /dev/null +++ b/proton-c/bindings/cpp/src/engine.cpp @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "proton/engine.hpp" +#include "proton/error.hpp" + +#include "uuid.hpp" +#include "proton_bits.hpp" +#include "messaging_event.hpp" + +#include <proton/connection.h> +#include <proton/transport.h> +#include <proton/event.h> + +namespace proton { + +struct engine::impl { + + impl(class handler& h, pn_transport_t *t) : + handler(h), transport(t), connection(pn_connection()), collector(pn_collector()) + {} + + ~impl() { + pn_transport_free(transport); + pn_connection_free(connection); + pn_collector_free(collector); + } + + void check(int err, const std::string& msg) { + if (err) + throw proton::error(msg + error_str(pn_transport_error(transport), err)); + } + + pn_event_t *peek() { return pn_collector_peek(collector); } + void pop() { pn_collector_pop(collector); } + + class handler& handler; + pn_transport_t *transport; + pn_connection_t *connection; + pn_collector_t * collector; +}; + +engine::engine(handler &h, const std::string& id_) : impl_(new impl(h, pn_transport())) { + if (!impl_->transport || !impl_->connection || !impl_->collector) + throw error("engine setup failed"); + std::string id = id_.empty() ? uuid().str() : id_; + pn_connection_set_container(impl_->connection, id.c_str()); + impl_->check(pn_transport_bind(impl_->transport, impl_->connection), "engine bind: "); + pn_connection_collect(impl_->connection, impl_->collector); +} + +engine::~engine() {} + +buffer<char> engine::input() { + ssize_t n = pn_transport_capacity(impl_->transport); + if (n <= 0) + return buffer<char>(); + return buffer<char>(pn_transport_tail(impl_->transport), n); +} + +void engine::close_input() { + pn_transport_close_tail(impl_->transport); + run(); +} + +void engine::received(size_t n) { + impl_->check(pn_transport_process(impl_->transport, n), "engine process: "); + run(); +} + +void engine::run() { + for (pn_event_t *e = impl_->peek(); e; e = impl_->peek()) { + switch (pn_event_type(e)) { + case PN_CONNECTION_REMOTE_CLOSE: + pn_transport_close_tail(impl_->transport); + break; + case PN_CONNECTION_LOCAL_CLOSE: + pn_transport_close_head(impl_->transport); + break; + default: + break; + } + messaging_event mevent(e, pn_event_type(e), this); + mevent.dispatch(impl_->handler); + impl_->pop(); + } +} + +buffer<const char> engine::output() { + ssize_t n = pn_transport_pending(impl_->transport); + if (n <= 0) + return buffer<const char>(); + return buffer<const char>(pn_transport_head(impl_->transport), n); +} + +void engine::sent(size_t n) { + pn_transport_pop(impl_->transport, n); + run(); +} + +void engine::close_output() { + pn_transport_close_head(impl_->transport); + run(); +} + +bool engine::closed() const { + return pn_transport_closed(impl_->transport); +} + +class connection& engine::connection() const { + return *connection::cast(impl_->connection); +} + +std::string engine::id() const { return connection().container_id(); } + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/proton-c/bindings/cpp/src/event.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/event.cpp b/proton-c/bindings/cpp/src/event.cpp index d79df33..a732cc4 100644 --- a/proton-c/bindings/cpp/src/event.cpp +++ b/proton-c/bindings/cpp/src/event.cpp @@ -38,11 +38,20 @@ event::event() {} event::~event() {} +event_loop &event::event_loop() const { + throw error(MSG("No event_loop context for event")); +} + container &event::container() const { // Subclasses to override as appropriate throw error(MSG("No container context for event")); } +engine &event::engine() const { + // Subclasses to override as appropriate + throw error(MSG("No engine context for event")); +} + connection &event::connection() const { throw error(MSG("No connection context for event")); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/proton-c/bindings/cpp/src/messaging_event.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/messaging_event.cpp b/proton-c/bindings/cpp/src/messaging_event.cpp index 3e59b1c..3a0514d 100644 --- a/proton-c/bindings/cpp/src/messaging_event.cpp +++ b/proton-c/bindings/cpp/src/messaging_event.cpp @@ -40,12 +40,12 @@ namespace proton { -messaging_event::messaging_event(pn_event_t *ce, proton_event::event_type t, class container &c) : - proton_event(ce, t, c), type_(messaging_event::PROTON), parent_event_(0), message_(0) +messaging_event::messaging_event(pn_event_t *ce, proton_event::event_type t, class event_loop *el) : + proton_event(ce, t, el), type_(messaging_event::PROTON), parent_event_(0), message_(0) {} messaging_event::messaging_event(event_type t, proton_event &p) : - proton_event(NULL, PN_EVENT_NONE, p.container()), type_(t), parent_event_(&p), message_(0) + proton_event(NULL, PN_EVENT_NONE, &p.event_loop()), type_(t), parent_event_(&p), message_(0) { if (type_ == messaging_event::PROTON) throw error(MSG("invalid messaging event type")); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/proton-c/bindings/cpp/src/messaging_event.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/messaging_event.hpp b/proton-c/bindings/cpp/src/messaging_event.hpp index 84cf3f3..f97d1a5 100644 --- a/proton-c/bindings/cpp/src/messaging_event.hpp +++ b/proton-c/bindings/cpp/src/messaging_event.hpp @@ -83,7 +83,7 @@ class messaging_event : public proton_event TRANSPORT_CLOSED }; - messaging_event(pn_event_t *ce, proton_event::event_type t, class container &c); + messaging_event(pn_event_t *, proton_event::event_type, class event_loop *); messaging_event(event_type t, proton_event &parent); ~messaging_event(); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/proton-c/bindings/cpp/src/proton_event.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/proton_event.cpp b/proton-c/bindings/cpp/src/proton_event.cpp index 649d908..068d245 100644 --- a/proton-c/bindings/cpp/src/proton_event.cpp +++ b/proton-c/bindings/cpp/src/proton_event.cpp @@ -24,6 +24,7 @@ #include "proton/link.h" #include "proton/container.hpp" +#include "proton/engine.hpp" #include "proton/delivery.hpp" #include "proton/error.hpp" #include "proton_event.hpp" @@ -36,10 +37,10 @@ namespace proton { -proton_event::proton_event(pn_event_t *ce, proton_event::event_type t, class container &c) : +proton_event::proton_event(pn_event_t *ce, proton_event::event_type t, class event_loop *el) : pn_event_(ce), type_(t), - container_(c) + event_loop_(el) {} int proton_event::type() const { return type_; } @@ -48,7 +49,25 @@ std::string proton_event::name() const { return pn_event_type_name(pn_event_type pn_event_t *proton_event::pn_event() const { return pn_event_; } -container &proton_event::container() const { return container_; } +event_loop &proton_event::event_loop() const { + if (!event_loop_) + throw error(MSG("No event_loop context for this event")); + return *event_loop_; +} + +container &proton_event::container() const { + class container *c = dynamic_cast<class container*>(event_loop_); + if (!c) + throw error(MSG("No container context for this event")); + return *c; +} + +engine &proton_event::engine() const { + class engine *e = dynamic_cast<class engine*>(event_loop_); + if (!e) + throw error(MSG("No engine context for this event")); + return *e; +} connection &proton_event::connection() const { pn_connection_t *conn = pn_event_connection(pn_event()); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/proton-c/bindings/cpp/src/proton_event.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/proton_event.hpp b/proton-c/bindings/cpp/src/proton_event.hpp index 24ca5e4..e382d8c 100644 --- a/proton-c/bindings/cpp/src/proton_event.hpp +++ b/proton-c/bindings/cpp/src/proton_event.hpp @@ -50,182 +50,182 @@ class proton_event : public event * Defined as a programming convenience. No event of this type will * ever be generated. */ - PN_CPP_EXTERN static const event_type EVENT_NONE; + static const event_type EVENT_NONE; /** * A reactor has been started. Events of this type point to the reactor. */ - PN_CPP_EXTERN static const event_type REACTOR_INIT; + static const event_type REACTOR_INIT; /** * A reactor has no more events to process. Events of this type * point to the reactor. */ - PN_CPP_EXTERN static const event_type REACTOR_QUIESCED; + static const event_type REACTOR_QUIESCED; /** * A reactor has been stopped. Events of this type point to the reactor. */ - PN_CPP_EXTERN static const event_type REACTOR_FINAL; + static const event_type REACTOR_FINAL; /** * A timer event has occurred. */ - PN_CPP_EXTERN static const event_type TIMER_TASK; + static const event_type TIMER_TASK; /** * The connection has been created. This is the first event that * will ever be issued for a connection. Events of this type point * to the relevant connection. */ - PN_CPP_EXTERN static const event_type CONNECTION_INIT; + static const event_type CONNECTION_INIT; /** * The connection has been bound to a transport. This event is * issued when the transport::bind() is called. */ - PN_CPP_EXTERN static const event_type CONNECTION_BOUND; + static const event_type CONNECTION_BOUND; /** * The connection has been unbound from its transport. This event is * issued when transport::unbind() is called. */ - PN_CPP_EXTERN static const event_type CONNECTION_UNBOUND; + static const event_type CONNECTION_UNBOUND; /** * The local connection endpoint has been closed. Events of this * type point to the relevant connection. */ - PN_CPP_EXTERN static const event_type CONNECTION_LOCAL_OPEN; + static const event_type CONNECTION_LOCAL_OPEN; /** * The remote endpoint has opened the connection. Events of this * type point to the relevant connection. */ - PN_CPP_EXTERN static const event_type CONNECTION_REMOTE_OPEN; + static const event_type CONNECTION_REMOTE_OPEN; /** * The local connection endpoint has been closed. Events of this * type point to the relevant connection. */ - PN_CPP_EXTERN static const event_type CONNECTION_LOCAL_CLOSE; + static const event_type CONNECTION_LOCAL_CLOSE; /** * The remote endpoint has closed the connection. Events of this * type point to the relevant connection. */ - PN_CPP_EXTERN static const event_type CONNECTION_REMOTE_CLOSE; + static const event_type CONNECTION_REMOTE_CLOSE; /** * The connection has been freed and any outstanding processing has * been completed. This is the final event that will ever be issued * for a connection. */ - PN_CPP_EXTERN static const event_type CONNECTION_FINAL; + static const event_type CONNECTION_FINAL; /** * The session has been created. This is the first event that will * ever be issued for a session. */ - PN_CPP_EXTERN static const event_type SESSION_INIT; + static const event_type SESSION_INIT; /** * The local session endpoint has been opened. Events of this type * point to the relevant session. */ - PN_CPP_EXTERN static const event_type SESSION_LOCAL_OPEN; + static const event_type SESSION_LOCAL_OPEN; /** * The remote endpoint has opened the session. Events of this type * point to the relevant session. */ - PN_CPP_EXTERN static const event_type SESSION_REMOTE_OPEN; + static const event_type SESSION_REMOTE_OPEN; /** * The local session endpoint has been closed. Events of this type * point ot the relevant session. */ - PN_CPP_EXTERN static const event_type SESSION_LOCAL_CLOSE; + static const event_type SESSION_LOCAL_CLOSE; /** * The remote endpoint has closed the session. Events of this type * point to the relevant session. */ - PN_CPP_EXTERN static const event_type SESSION_REMOTE_CLOSE; + static const event_type SESSION_REMOTE_CLOSE; /** * The session has been freed and any outstanding processing has * been completed. This is the final event that will ever be issued * for a session. */ - PN_CPP_EXTERN static const event_type SESSION_FINAL; + static const event_type SESSION_FINAL; /** * The link has been created. This is the first event that will ever * be issued for a link. */ - PN_CPP_EXTERN static const event_type LINK_INIT; + static const event_type LINK_INIT; /** * The local link endpoint has been opened. Events of this type * point ot the relevant link. */ - PN_CPP_EXTERN static const event_type LINK_LOCAL_OPEN; + static const event_type LINK_LOCAL_OPEN; /** * The remote endpoint has opened the link. Events of this type * point to the relevant link. */ - PN_CPP_EXTERN static const event_type LINK_REMOTE_OPEN; + static const event_type LINK_REMOTE_OPEN; /** * The local link endpoint has been closed. Events of this type * point ot the relevant link. */ - PN_CPP_EXTERN static const event_type LINK_LOCAL_CLOSE; + static const event_type LINK_LOCAL_CLOSE; /** * The remote endpoint has closed the link. Events of this type * point to the relevant link. */ - PN_CPP_EXTERN static const event_type LINK_REMOTE_CLOSE; + static const event_type LINK_REMOTE_CLOSE; /** * The local link endpoint has been detached. Events of this type * point to the relevant link. */ - PN_CPP_EXTERN static const event_type LINK_LOCAL_DETACH; + static const event_type LINK_LOCAL_DETACH; /** * The remote endpoint has detached the link. Events of this type * point to the relevant link. */ - PN_CPP_EXTERN static const event_type LINK_REMOTE_DETACH; + static const event_type LINK_REMOTE_DETACH; /** * The flow control state for a link has changed. Events of this * type point to the relevant link. */ - PN_CPP_EXTERN static const event_type LINK_FLOW; + static const event_type LINK_FLOW; /** * The link has been freed and any outstanding processing has been * completed. This is the final event that will ever be issued for a * link. Events of this type point to the relevant link. */ - PN_CPP_EXTERN static const event_type LINK_FINAL; + static const event_type LINK_FINAL; /** * A delivery has been created or updated. Events of this type point * to the relevant delivery. */ - PN_CPP_EXTERN static const event_type DELIVERY; + static const event_type DELIVERY; /** * The transport has new data to read and/or write. Events of this * type point to the relevant transport. */ - PN_CPP_EXTERN static const event_type TRANSPORT; + static const event_type TRANSPORT; /** * The transport has authenticated, if this is received by a server @@ -233,64 +233,67 @@ class proton_event : public event * and transport::user() can be used to obtain the authenticated * user. */ - PN_CPP_EXTERN static const event_type TRANSPORT_AUTHENTICATED; + static const event_type TRANSPORT_AUTHENTICATED; /** * Indicates that a transport error has occurred. Use * transport::condition() to access the details of the error * from the associated transport. */ - PN_CPP_EXTERN static const event_type TRANSPORT_ERROR; + static const event_type TRANSPORT_ERROR; /** * Indicates that the head of the transport has been closed. This * means the transport will never produce more bytes for output to * the network. Events of this type point to the relevant transport. */ - PN_CPP_EXTERN static const event_type TRANSPORT_HEAD_CLOSED; + static const event_type TRANSPORT_HEAD_CLOSED; /** * Indicates that the tail of the transport has been closed. This * means the transport will never be able to process more bytes from * the network. Events of this type point to the relevant transport. */ - PN_CPP_EXTERN static const event_type TRANSPORT_TAIL_CLOSED; + static const event_type TRANSPORT_TAIL_CLOSED; /** * Indicates that the both the head and tail of the transport are * closed. Events of this type point to the relevant transport. */ - PN_CPP_EXTERN static const event_type TRANSPORT_CLOSED; + static const event_type TRANSPORT_CLOSED; - PN_CPP_EXTERN static const event_type SELECTABLE_INIT; - PN_CPP_EXTERN static const event_type SELECTABLE_UPDATED; - PN_CPP_EXTERN static const event_type SELECTABLE_READABLE; - PN_CPP_EXTERN static const event_type SELECTABLE_WRITABLE; - PN_CPP_EXTERN static const event_type SELECTABLE_ERROR; - PN_CPP_EXTERN static const event_type SELECTABLE_EXPIRED; - PN_CPP_EXTERN static const event_type SELECTABLE_FINAL; + static const event_type SELECTABLE_INIT; + static const event_type SELECTABLE_UPDATED; + static const event_type SELECTABLE_READABLE; + static const event_type SELECTABLE_WRITABLE; + static const event_type SELECTABLE_ERROR; + static const event_type SELECTABLE_EXPIRED; + static const event_type SELECTABLE_FINAL; ///@} - virtual PN_CPP_EXTERN void dispatch(handler &h); - virtual PN_CPP_EXTERN class container &container() const; - virtual PN_CPP_EXTERN class connection &connection() const; - virtual PN_CPP_EXTERN class sender& sender() const; - virtual PN_CPP_EXTERN class receiver& receiver() const; - virtual PN_CPP_EXTERN class link& link() const; - virtual PN_CPP_EXTERN class delivery& delivery() const; + virtual void dispatch(handler &h); + virtual class event_loop &event_loop() const; + virtual class container &container() const; + virtual class engine &engine() const; + virtual class connection &connection() const; + virtual class sender& sender() const; + virtual class receiver& receiver() const; + virtual class link& link() const; + virtual class delivery& delivery() const; /** Get type of event */ - PN_CPP_EXTERN event_type type() const; + event_type type() const; - PN_CPP_EXTERN pn_event_t* pn_event() const; + pn_event_t* pn_event() const; protected: - PN_CPP_EXTERN proton_event(pn_event_t *ce, proton_event::event_type t, class container &c); + proton_event(pn_event_t *, proton_event::event_type, class event_loop *); + private: mutable pn_event_t *pn_event_; event_type type_; - class container &container_; + class event_loop *event_loop_; }; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/proton-c/bindings/cpp/src/url.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/url.cpp b/proton-c/bindings/cpp/src/url.cpp index b9407de..ec02e53 100644 --- a/proton-c/bindings/cpp/src/url.cpp +++ b/proton-c/bindings/cpp/src/url.cpp @@ -22,8 +22,7 @@ #include "proton/error.hpp" #include "proton/url.hpp" #include "proton/url.h" -#include <ostream> -#include <istream> +#include <sstream> namespace proton { @@ -94,6 +93,18 @@ void url::defaults() { const std::string url::AMQP("amqp"); const std::string url::AMQPS("amqps"); +uint16_t url::port_int() const { + // TODO aconway 2015-10-27: full service name lookup + if (port() == AMQP) return 5672; + if (port() == AMQPS) return 5671; + std::istringstream is(port()); + uint16_t result; + is >> result; + if (is.fail()) + throw url_error("invalid port '" + port() + "'"); + return result; +} + std::ostream& operator<<(std::ostream& o, const url& u) { return o << u.str(); } std::istream& operator>>(std::istream& i, url& u) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
