Repository: qpid-proton Updated Branches: refs/heads/master 587f3cd8f -> 8e61c86ac
PROTON-1066: connection options and reconnect for C++ binding Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/8e61c86a Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/8e61c86a Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/8e61c86a Branch: refs/heads/master Commit: 8e61c86ac47f3613ab3f28dfdb4aca0db95482fb Parents: 587f3cd Author: Clifford Jansen <[email protected]> Authored: Wed Nov 25 16:10:45 2015 -0800 Committer: Clifford Jansen <[email protected]> Committed: Wed Nov 25 16:14:21 2015 -0800 ---------------------------------------------------------------------- examples/cpp/CMakeLists.txt | 1 + examples/cpp/connection_options.cpp | 73 +++++++++++ proton-c/bindings/cpp/CMakeLists.txt | 2 + .../bindings/cpp/include/proton/connection.hpp | 10 +- .../cpp/include/proton/connection_options.hpp | 104 ++++++++++++++++ .../bindings/cpp/include/proton/container.hpp | 16 ++- .../cpp/include/proton/reconnect_timer.hpp | 63 ++++++++++ .../bindings/cpp/include/proton/transport.hpp | 13 +- .../cpp/src/blocking_connection_impl.cpp | 3 +- proton-c/bindings/cpp/src/connection.cpp | 13 +- .../bindings/cpp/src/connection_options.cpp | 124 +++++++++++++++++++ proton-c/bindings/cpp/src/connector.cpp | 67 +++++++++- proton-c/bindings/cpp/src/connector.hpp | 15 ++- proton-c/bindings/cpp/src/container.cpp | 14 ++- proton-c/bindings/cpp/src/container_impl.cpp | 77 ++++++++++-- proton-c/bindings/cpp/src/container_impl.hpp | 9 +- proton-c/bindings/cpp/src/reconnect_timer.cpp | 69 +++++++++++ proton-c/bindings/cpp/src/transport.cpp | 36 ++++++ 18 files changed, 680 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e61c86a/examples/cpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/examples/cpp/CMakeLists.txt b/examples/cpp/CMakeLists.txt index 8916963..8890e2d 100644 --- a/examples/cpp/CMakeLists.txt +++ b/examples/cpp/CMakeLists.txt @@ -35,6 +35,7 @@ set(examples server server_direct recurring_timer + connection_options encode_decode) if (NOT WIN32) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e61c86a/examples/cpp/connection_options.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/connection_options.cpp b/examples/cpp/connection_options.cpp new file mode 100644 index 0000000..da3c4d9 --- /dev/null +++ b/examples/cpp/connection_options.cpp @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "proton/container.hpp" +#include "proton/messaging_handler.hpp" +#include "proton/url.hpp" +#include "proton/transport.hpp" + +#include <iostream> + +using proton::connection_options; + +class handler_2 : public proton::messaging_handler { + void on_connection_opened(proton::event &e) { + std::cout << "connection events going to handler_2" << std::endl; + std::cout << "connection max_frame_size: " << e.connection().transport().max_frame_size() << + ", idle timeout: " << e.connection().transport().idle_timeout() << std::endl; + e.connection().close(); + } +}; + +class main_handler : public proton::messaging_handler { + private: + proton::url url; + handler_2 conn_handler; + + public: + main_handler(const proton::url& u) : url(u) {} + + void on_start(proton::event &e) { + // Connection options for this connection. Merged with and overriding the container's + // client_connection_options() settings. + e.container().connect(url, connection_options().handler(&conn_handler).max_frame_size(2468)); + } + + void on_connection_opened(proton::event &e) { + std::cout << "unexpected connection event on main handler" << std::endl; + e.connection().close(); + } +}; + +int main(int argc, char **argv) { + try { + std::string url = argc > 1 ? argv[1] : "127.0.0.1:5672/examples"; + main_handler handler(url); + proton::container container(handler); + // Global connection options for future connections on container. + container.client_connection_options(connection_options().max_frame_size(12345).idle_timeout(15000)); + container.run(); + return 0; + } catch (const std::exception& e) { + std::cerr << e.what() << std::endl; + } + return 1; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e61c86a/proton-c/bindings/cpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/CMakeLists.txt b/proton-c/bindings/cpp/CMakeLists.txt index bf9af46..47ca937 100644 --- a/proton-c/bindings/cpp/CMakeLists.txt +++ b/proton-c/bindings/cpp/CMakeLists.txt @@ -33,6 +33,7 @@ set(qpid-proton-cpp-source src/blocking_receiver.cpp src/blocking_sender.cpp src/connection.cpp + src/connection_options.cpp src/connector.cpp src/container.cpp src/container_impl.cpp @@ -59,6 +60,7 @@ set(qpid-proton-cpp-source src/proton_handler.cpp src/reactor.cpp src/receiver.cpp + src/reconnect_timer.cpp src/request_response.cpp src/sender.cpp src/session.cpp http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e61c86a/proton-c/bindings/cpp/include/proton/connection.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/connection.hpp b/proton-c/bindings/cpp/include/proton/connection.hpp index b662b9c..bb438ed 100644 --- a/proton-c/bindings/cpp/include/proton/connection.hpp +++ b/proton-c/bindings/cpp/include/proton/connection.hpp @@ -26,6 +26,7 @@ #include "proton/link.hpp" #include "proton/object.hpp" #include "proton/session.hpp" +#include "proton/connection_options.hpp" #include "proton/types.h" #include <string> @@ -52,9 +53,12 @@ class connection : public object<pn_connection_t>, endpoint /// Get the container, throw an exception if event_loop is not a container. PN_CPP_EXTERN class container &container() const; - /// Get the engine, , throw an exception if event_loop is not an engine. + /// Get the engine, throw an exception if event_loop is not an engine. PN_CPP_EXTERN class engine &engine() const; + /// Get the transport for the connection. + PN_CPP_EXTERN class transport transport() const; + /// Return the AMQP host name for the connection. PN_CPP_EXTERN std::string host() const; @@ -102,6 +106,10 @@ class connection : public object<pn_connection_t>, endpoint /** Get the endpoint state */ PN_CPP_EXTERN endpoint::state state() const; + + friend class connection_options; + friend class connector; + friend class transport; }; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e61c86a/proton-c/bindings/cpp/include/proton/connection_options.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/connection_options.hpp b/proton-c/bindings/cpp/include/proton/connection_options.hpp new file mode 100644 index 0000000..2755adf --- /dev/null +++ b/proton-c/bindings/cpp/include/proton/connection_options.hpp @@ -0,0 +1,104 @@ +#ifndef PROTON_CPP_CONNECTION_OPTIONS_H +#define PROTON_CPP_CONNECTION_OPTIONS_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/config.hpp" +#include "proton/export.hpp" +#include "proton/pn_unique_ptr.hpp" +#include "proton/reconnect_timer.hpp" +#include "proton/types.hpp" +//#include "proton/ssl.hpp" + +#include <vector> +#include <string> + +namespace proton { + +class handler; +class connection; + +/** Options for creating a connection. + * + * Options can be "chained" like this: + * + * c = container.connect(url, connection_options().handler(h).max_frame_size(1234)); + * + * You can also create an options object with common settings and use it as a base + * for different connections that have mostly the same settings: + * + * connection_options opts; + * opts.idle_timeout(1000).max_frame_size(10000); + * c1 = container.connect(url1, opts.handler(h1)); + * c2 = container.connect(url2, opts.handler(h2)); + * + * Normal value semantics, copy or assign creates a separate copy of the options. + */ +class connection_options { + public: + PN_CPP_EXTERN connection_options(); + PN_CPP_EXTERN connection_options(const connection_options&); + PN_CPP_EXTERN ~connection_options(); + PN_CPP_EXTERN connection_options& operator=(const connection_options&); + + /// Override with options from other. + PN_CPP_EXTERN void override(const connection_options& other); + + // TODO: Document options + + PN_CPP_EXTERN connection_options& handler(class handler *); + PN_CPP_EXTERN connection_options& max_frame_size(uint32_t max); + PN_CPP_EXTERN connection_options& max_channels(uint16_t max); + PN_CPP_EXTERN connection_options& idle_timeout(uint32_t t); + PN_CPP_EXTERN connection_options& heartbeat(uint32_t t); + PN_CPP_EXTERN connection_options& container_id(const std::string &id); + PN_CPP_EXTERN connection_options& reconnect(const reconnect_timer &); +#ifdef PN_CPP_SOON + PN_CPP_EXTERN connection_options& client_domain(const class client_domain &); + PN_CPP_EXTERN connection_options& server_domain(const class server_domain &); + PN_CPP_EXTERN connection_options& peer_hostname(const std::string &name); + PN_CPP_EXTERN connection_options& resume_id(const std::string &id); + PN_CPP_EXTERN connection_options& sasl_enabled(bool); + PN_CPP_EXTERN connection_options& allow_insecure_mechs(bool); + PN_CPP_EXTERN connection_options& allowed_mechs(const std::string &); +#endif + private: + void apply(connection&) const; + class handler* handler() const; + static pn_connection_t *pn_connection(connection &); +#ifdef PN_CPP_SOON + bool sasl_enabled() const; + bool allow_insecure_mechs() const; + std::string *allowed_mechs() const; + class client_domain &client_domain(); + class server_domain &server_domain(); +#endif + + class impl; + pn_unique_ptr<impl> impl_; + + friend class container_impl; + friend class connector; +}; + +} // namespace + +#endif /*!PROTON_CPP_CONNECTION_OPTIONS_H*/ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e61c86a/proton-c/bindings/cpp/include/proton/container.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/container.hpp b/proton-c/bindings/cpp/include/proton/container.hpp index 984d37a..2cebcd4 100644 --- a/proton-c/bindings/cpp/include/proton/container.hpp +++ b/proton-c/bindings/cpp/include/proton/container.hpp @@ -27,6 +27,7 @@ #include "proton/pn_unique_ptr.hpp" #include "proton/reactor.hpp" #include "proton/url.hpp" +#include "proton/connection_options.hpp" #include <string> @@ -59,10 +60,10 @@ class container : public event_loop { PN_CPP_EXTERN ~container(); /** Locally open a connection @see connection::open */ - PN_CPP_EXTERN connection connect(const proton::url&, handler *h=0); + PN_CPP_EXTERN connection connect(const proton::url&, const connection_options &opts = connection_options()); /** Open a connection to url and create a receiver with source=url.path() */ - PN_CPP_EXTERN acceptor listen(const proton::url &); + PN_CPP_EXTERN acceptor listen(const proton::url&, const connection_options &opts = connection_options()); /** Run the event loop, return when all connections and acceptors are closed. */ PN_CPP_EXTERN void run(); @@ -82,6 +83,17 @@ class container : public event_loop { // Schedule a timer task event in delay milliseconds. PN_CPP_EXTERN task schedule(int delay, handler *h = 0); + /** Copy the connection options to a template which will be + applied to subsequent outgoing connections. These are applied first + and overriden by additional connection options provided in + other methods */ + PN_CPP_EXTERN void client_connection_options(const connection_options &); + + /** Copy the connection options to a template which will be + applied to incoming connections. These are applied before the + first open event on the connection. */ + PN_CPP_EXTERN void server_connection_options(const connection_options &); + private: pn_unique_ptr<container_impl> impl_; }; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e61c86a/proton-c/bindings/cpp/include/proton/reconnect_timer.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/reconnect_timer.hpp b/proton-c/bindings/cpp/include/proton/reconnect_timer.hpp new file mode 100644 index 0000000..f89f47e --- /dev/null +++ b/proton-c/bindings/cpp/include/proton/reconnect_timer.hpp @@ -0,0 +1,63 @@ +#ifndef PROTON_CPP_RECONNECT_H +#define PROTON_CPP_RECONNECT_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "proton/export.hpp" +#include "proton/types.hpp" +#include "proton/reactor.hpp" +#include <string> + +namespace proton { + +/** A class that generates a series of delays to coordinate reconnection attempts. They may be open ended or limited in time. They may be evenly spaced or doubling at an exponential rate. */ +class reconnect_timer +{ + public: + /** TODO: + */ + PN_CPP_EXTERN reconnect_timer(uint32_t first = 0, int32_t max = -1, uint32_t increment = 100, + bool doubling = true, int32_t max_retries = -1, int32_t timeout = -1); + + /** Indicate a successful connection, resetting the internal timer values */ + PN_CPP_EXTERN void reset(); + + /** Obtain the timer's computed time to delay before attempting a reconnection attempt (in milliseconds). -1 means that the retry limit or timeout has been exceeded and reconnection attempts should cease. */ + PN_CPP_EXTERN int next_delay(); + + private: + int32_t first_delay_; + int32_t max_delay_; + int32_t increment_; + bool doubling_; + int32_t max_retries_; + int32_t timeout_; + int32_t retries_; + int32_t next_delay_; + pn_timestamp_t timeout_deadline_; + reactor reactor_; + friend class connector; +}; + +} + +#endif /*!PROTON_CPP_RECONNECT_H*/ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e61c86a/proton-c/bindings/cpp/include/proton/transport.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/transport.hpp b/proton-c/bindings/cpp/include/proton/transport.hpp index d60786a..0b73a8c 100644 --- a/proton-c/bindings/cpp/include/proton/transport.hpp +++ b/proton-c/bindings/cpp/include/proton/transport.hpp @@ -23,7 +23,7 @@ */ #include "proton/object.hpp" - +#include "proton/types.hpp" #include "proton/export.hpp" struct pn_transport_t; @@ -38,7 +38,16 @@ class transport : public object<pn_transport_t> public: transport(pn_transport_t* t) : object<pn_transport_t>(t) {} - class connection connection() const; + PN_CPP_EXTERN class connection connection() const; + PN_CPP_EXTERN void unbind(); + PN_CPP_EXTERN void bind(class connection &); + PN_CPP_EXTERN uint32_t max_frame_size() const; + PN_CPP_EXTERN uint32_t remote_max_frame_size() const; + PN_CPP_EXTERN uint16_t max_channels() const; + PN_CPP_EXTERN uint16_t remote_max_channels() const; + PN_CPP_EXTERN uint32_t idle_timeout() const; + PN_CPP_EXTERN uint32_t remote_idle_timeout() const; + friend class connection_options; }; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e61c86a/proton-c/bindings/cpp/src/blocking_connection_impl.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/blocking_connection_impl.cpp b/proton-c/bindings/cpp/src/blocking_connection_impl.cpp index 65ae9bb..7a5d882 100644 --- a/proton-c/bindings/cpp/src/blocking_connection_impl.cpp +++ b/proton-c/bindings/cpp/src/blocking_connection_impl.cpp @@ -50,7 +50,8 @@ blocking_connection_impl::blocking_connection_impl(const url& url, duration time { container_->reactor().start(); container_->reactor().timeout(timeout); - connection_ = container_->connect(url, this); // Set this as handler. + handler *h = static_cast<handler*>(this); // Set this as handler. + connection_ = container_->connect(url, connection_options().handler(h)); wait(connection_opening(connection_)); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e61c86a/proton-c/bindings/cpp/src/connection.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/connection.cpp b/proton-c/bindings/cpp/src/connection.cpp index 11cc29b..5226a12 100644 --- a/proton-c/bindings/cpp/src/connection.cpp +++ b/proton-c/bindings/cpp/src/connection.cpp @@ -24,6 +24,7 @@ #include "proton/handler.hpp" #include "proton/session.hpp" #include "proton/error.hpp" +#include "connector.hpp" #include "msg.hpp" #include "contexts.hpp" @@ -39,7 +40,17 @@ namespace proton { connection_context& connection::context() const { return connection_context::get(pn_object()); } -void connection::open() { pn_connection_open(pn_object()); } +transport connection::transport() const { + return pn_connection_transport(pn_object()); +} + +void connection::open() { + connector *connector = dynamic_cast<class connector*>(context().handler.get()); + if (connector) + connector->apply_options(); + // Inbound connections should already be configured. + pn_connection_open(pn_object()); +} void connection::close() { pn_connection_close(pn_object()); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e61c86a/proton-c/bindings/cpp/src/connection_options.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/connection_options.cpp b/proton-c/bindings/cpp/src/connection_options.cpp new file mode 100644 index 0000000..6d5b829 --- /dev/null +++ b/proton-c/bindings/cpp/src/connection_options.cpp @@ -0,0 +1,124 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/connection_options.hpp" +#include "proton/reconnect_timer.hpp" +#include "proton/transport.hpp" +#include "contexts.hpp" +#include "connector.hpp" +#include "msg.hpp" + +#include "proton/transport.h" + +namespace proton { + +template <class T> struct option { + T value; + bool set; + + option() : value(), set(false) {} + option& operator=(const T& x) { value = x; set = true; return *this; } + void override(const option<T>& x) { if (x.set) *this = x.value; } +}; + +class connection_options::impl { + public: + option<class handler*> handler; + option<uint32_t> max_frame_size; + option<uint16_t> max_channels; + option<uint32_t> idle_timeout; + option<uint32_t> heartbeat; + option<std::string> container_id; + option<reconnect_timer> reconnect; +#ifdef PN_CCP_SOON + option<class client_domain> client_domain; + option<class server_domain> server_domain; + option<std::string> peer_hostname; + option<std::string> resume_id; + option<bool> sasl_enabled; + option<std::string> allowed_mechs; + option<bool> allow_insecure_mechs; +#endif + + void apply(connection& c) { + pn_connection_t *pnc = connection_options::pn_connection(c); + pn_transport_t *pnt = pn_connection_transport(pnc); + connector *outbound = dynamic_cast<connector*>(c.context().handler.get()); + bool uninit = (c.state() & endpoint::LOCAL_UNINIT); + + // pnt is NULL between reconnect attempts. + // Only apply transport options if uninit or outbound with + // transport not yet configured. + if (pnt && (uninit || (outbound && !outbound->transport_configured()))) + { + if (max_frame_size.set) + pn_transport_set_max_frame(pnt, max_frame_size.value); + if (max_channels.set) + pn_transport_set_channel_max(pnt, max_channels.value); + if (idle_timeout.set) + pn_transport_set_idle_timeout(pnt, idle_timeout.value); + } + // Only apply connection options if uninit. + if (uninit) { + if (reconnect.set && outbound) + outbound->reconnect_timer(reconnect.value); + if (container_id.set) + pn_connection_set_container(pnc, container_id.value.c_str()); + } + } + + void override(const impl& x) { + handler.override(x.handler); + max_frame_size.override(x.max_frame_size); + max_channels.override(x.max_channels); + idle_timeout.override(x.idle_timeout); + heartbeat.override(x.heartbeat); + container_id.override(x.container_id); + reconnect.override(x.reconnect); + } + +}; + +connection_options::connection_options() : impl_(new impl()) {} +connection_options::connection_options(const connection_options& x) : impl_(new impl()) { + *this = x; +} +connection_options::~connection_options() {} + +connection_options& connection_options::operator=(const connection_options& x) { + *impl_ = *x.impl_; + return *this; +} + +void connection_options::override(const connection_options& x) { impl_->override(*x.impl_); } + +connection_options& connection_options::handler(class handler *h) { impl_->handler = h; return *this; } +connection_options& connection_options::max_frame_size(uint32_t n) { impl_->max_frame_size = n; return *this; } +connection_options& connection_options::max_channels(uint16_t n) { impl_->max_frame_size = n; return *this; } +connection_options& connection_options::idle_timeout(uint32_t t) { impl_->idle_timeout = t; return *this; } +connection_options& connection_options::heartbeat(uint32_t t) { impl_->heartbeat = t; return *this; } +connection_options& connection_options::container_id(const std::string &id) { impl_->container_id = id; return *this; } +connection_options& connection_options::reconnect(const reconnect_timer &rc) { impl_->reconnect = rc; return *this; } + +void connection_options::apply(connection& c) const { impl_->apply(c); } +handler* connection_options::handler() const { return impl_->handler.value; } + +pn_connection_t* connection_options::pn_connection(connection &c) { return c.pn_object(); } +} // namespace proton http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e61c86a/proton-c/bindings/cpp/src/connector.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/connector.cpp b/proton-c/bindings/cpp/src/connector.cpp index a8614cd..2547ff5 100644 --- a/proton-c/bindings/cpp/src/connector.cpp +++ b/proton-c/bindings/cpp/src/connector.cpp @@ -19,43 +19,98 @@ * */ +#include "connector.hpp" #include "proton/connection.hpp" #include "proton/transport.hpp" #include "proton/container.hpp" #include "proton/event.hpp" -#include "proton/connection.h" #include "proton/url.hpp" +#include "proton/reconnect_timer.hpp" +#include "proton/task.hpp" +#include "container_impl.hpp" -#include "connector.hpp" +#include "proton/connection.h" +#include "proton/transport.h" namespace proton { -connector::connector(connection &c) : connection_(c) {} +connector::connector(connection&c, const connection_options &opts) : + connection_(c), options_(opts), reconnect_timer_(0), transport_configured_(false) +{} -connector::~connector() {} +connector::~connector() { delete reconnect_timer_; } void connector::address(const url &a) { address_ = a; } +void connector::apply_options() { + if (!connection_) return; + options_.apply(connection_); +} + +bool connector::transport_configured() { return transport_configured_; } + +void connector::reconnect_timer(const class reconnect_timer &rt) { + delete reconnect_timer_; + reconnect_timer_ = new class reconnect_timer(rt); + reconnect_timer_->reactor_ = connection_.container().reactor(); +} + void connector::connect() { connection_.container_id(connection_.container().id()); connection_.host(address_.host_port()); + transport t(pn_transport()); + t.bind(connection_); + // Apply options to the new transport. + options_.apply(connection_); + transport_configured_ = true; } void connector::on_connection_local_open(event &e) { connect(); } -void connector::on_connection_remote_open(event &e) {} +void connector::on_connection_remote_open(event &e) { + if (reconnect_timer_) { + reconnect_timer_->reset(); + } +} void connector::on_connection_init(event &e) { } +void connector::on_transport_tail_closed(event &e) { + on_transport_closed(e); +} + void connector::on_transport_closed(event &e) { - // TODO: prepend with reconnect logic + if (!connection_) return; + if (connection_.state() & endpoint::LOCAL_ACTIVE) { + if (reconnect_timer_) { + e.connection().transport().unbind(); + transport_configured_ = false; + int delay = reconnect_timer_->next_delay(); + if (delay >= 0) { + if (delay == 0) { + // log "Disconnected, reconnecting..." + connect(); + return; + } + else { + // log "Disconnected, reconnecting in " << delay << " milliseconds" + connection_.container().schedule(delay, this); + return; + } + } + } + } connection_.release(); connection_ = 0; } +void connector::on_timer_task(event &e) { + connect(); +} + } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e61c86a/proton-c/bindings/cpp/src/connector.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/connector.hpp b/proton-c/bindings/cpp/src/connector.hpp index 818282a..458e2ba 100644 --- a/proton-c/bindings/cpp/src/connector.hpp +++ b/proton-c/bindings/cpp/src/connector.hpp @@ -23,9 +23,10 @@ */ #include "proton/proton_handler.hpp" +#include "proton/connection_options.hpp" +#include "proton/url.hpp" #include "proton/event.h" #include "proton/reactor.h" -#include "proton/url.h" #include <string> @@ -34,22 +35,32 @@ namespace proton { class event; class connection; class transport; +class reconnect_timer; class connector : public proton_handler { public: - connector(connection &c); + connector(connection &c, const connection_options &opts); ~connector(); void address(const url&); + const url &address() const { return address_; } void connect(); + void apply_options(); + void reconnect_timer(const class reconnect_timer &); + bool transport_configured(); virtual void on_connection_local_open(event &e); virtual void on_connection_remote_open(event &e); virtual void on_connection_init(event &e); virtual void on_transport_closed(event &e); + virtual void on_transport_tail_closed(event &e); + virtual void on_timer_task(event &e); private: connection connection_; url address_; + connection_options options_; + class reconnect_timer *reconnect_timer_; + bool transport_configured_; }; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e61c86a/proton-c/bindings/cpp/src/container.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/container.cpp b/proton-c/bindings/cpp/src/container.cpp index f19b72b..34bd2a0 100644 --- a/proton-c/bindings/cpp/src/container.cpp +++ b/proton-c/bindings/cpp/src/container.cpp @@ -48,7 +48,9 @@ container::container(messaging_handler &mhandler, const std::string& id) : container::~container() {} -connection container::connect(const url &host, handler *h) { return impl_->connect(host, h); } +connection container::connect(const url &host, const connection_options &opts) { + return impl_->connect(host, opts); +} reactor container::reactor() const { return impl_->reactor_; } @@ -64,10 +66,18 @@ receiver container::open_receiver(const proton::url &url) { return impl_->open_receiver(url); } -acceptor container::listen(const proton::url &url) { +acceptor container::listen(const proton::url &url, const connection_options &opts) { +#ifdef PN_COMING_SOON + return impl_->listen(url, opts); +#else return impl_->listen(url); +#endif } task container::schedule(int delay, handler *h) { return impl_->schedule(delay, h); } +void container::client_connection_options(const connection_options &o) { impl_->client_connection_options(o); } + +void container::server_connection_options(const connection_options &o) { impl_->server_connection_options(o); } + } // namespace proton http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e61c86a/proton-c/bindings/cpp/src/container_impl.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/container_impl.cpp b/proton-c/bindings/cpp/src/container_impl.cpp index 8a0b56e..bb4f4c5 100644 --- a/proton-c/bindings/cpp/src/container_impl.cpp +++ b/proton-c/bindings/cpp/src/container_impl.cpp @@ -19,6 +19,7 @@ * */ #include "proton/container.hpp" +#include "proton/connection_options.hpp" #include "proton/event.hpp" #include "messaging_event.hpp" #include "proton/connection.hpp" @@ -78,8 +79,9 @@ class override_handler : public handler { public: counted_ptr<pn_handler_t> base_handler; + container_impl &container_impl_; - override_handler(pn_handler_t *h) : base_handler(h) {} + override_handler(pn_handler_t *h, container_impl &c) : base_handler(h), container_impl_(c) {} virtual void on_unhandled(event &e) { proton_event *pne = dynamic_cast<proton_event *>(&e); @@ -90,9 +92,17 @@ class override_handler : public handler pn_event_t *cevent = pne->pn_event(); pn_connection_t *conn = pn_event_connection(cevent); - if (conn && type != PN_CONNECTION_INIT) { + if (conn) { handler *override = connection_context::get(conn).handler.get(); - if (override) e.dispatch(*override); + if (override && type != PN_CONNECTION_INIT) { + // Send event to connector + e.dispatch(*override); + } + else if (!override && type == PN_CONNECTION_INIT) { + // Newly accepted connection from lister socket + connection c(conn); + container_impl_.configure_server_connection(c); + } } pn_handler_dispatch(base_handler.get(), cevent, (pn_event_type_t) type); } @@ -124,7 +134,7 @@ container_impl::container_impl(container& c, handler *h, const std::string& id) // Set our own global handler that "subclasses" the existing one pn_handler_t *global_handler = reactor_.pn_global_handler(); - override_handler_.reset(new override_handler(global_handler)); + override_handler_.reset(new override_handler(global_handler, *this)); counted_ptr<pn_handler_t> cpp_global_handler(cpp_handler(override_handler_.get())); reactor_.pn_global_handler(cpp_global_handler.get()); if (handler_) { @@ -141,10 +151,14 @@ container_impl::container_impl(container& c, handler *h, const std::string& id) container_impl::~container_impl() {} -connection container_impl::connect(const proton::url &url, handler *h) { +connection container_impl::connect(const proton::url &url, const connection_options &user_opts) { + connection_options opts = client_connection_options(); // Defaults + opts.override(user_opts); + handler *h = opts.handler(); + counted_ptr<pn_handler_t> chandler = h ? cpp_handler(h) : counted_ptr<pn_handler_t>(); connection conn(reactor_.connection(chandler.get())); - pn_unique_ptr<connector> ctor(new connector(conn)); + pn_unique_ptr<connector> ctor(new connector(conn, opts)); ctor->address(url); // TODO: url vector connection_context& cc(conn.context()); cc.container_impl = this; @@ -154,7 +168,7 @@ connection container_impl::connect(const proton::url &url, handler *h) { } sender container_impl::open_sender(const proton::url &url) { - connection conn = connect(url, 0); + connection conn = connect(url, connection_options()); std::string path = url.path(); sender snd = conn.default_session().open_sender(id_ + '-' + path); snd.target().address(path); @@ -163,7 +177,7 @@ sender container_impl::open_sender(const proton::url &url) { } receiver container_impl::open_receiver(const proton::url &url) { - connection conn = connect(url, 0); + connection conn = connect(url, connection_options()); std::string path = url.path(); receiver rcv = conn.default_session().open_receiver(id_ + '-' + path); rcv.source().address(path); @@ -172,13 +186,28 @@ receiver container_impl::open_receiver(const proton::url &url) { } acceptor container_impl::listen(const proton::url& url) { +#ifdef PN_COMING_SOON + connection_options opts = server_connection_options(); // Defaults + opts.override(user_opts); + handler *h = opts.handler(); + counted_ptr<pn_handler_t> chandler = h ? cpp_handler(h) : counted_ptr<pn_handler_t>(); + pn_acceptor_t *acptr = pn_reactor_acceptor( + pn_cast(reactor_.get()), url.host().c_str(), url.port().c_str(), chandler.get()); +#else acceptor acptr = reactor_.listen(url); - if (!!acptr) - return acptr; - else +#endif + if (!acptr) throw error(MSG("accept fail: " << pn_error_text(pn_io_error(reactor_.pn_io()))) << "(" << url << ")"); +#ifdef PN_COMING_SOON + // Do not use pn_acceptor_set_ssl_domain(). Manage the incoming connections ourselves for + // more flexibility (i.e. ability to change the server cert for a long running listener). + listener_context& lc(listener_context::get(acptr)); + lc.connection_options = opts; + lc.ssl = url.scheme() == url::AMQPS; +#endif + return acptr; } std::string container_impl::next_link_name() { @@ -195,4 +224,30 @@ task container_impl::schedule(int delay, handler *h) { return reactor_.schedule(delay, task_handler.get()); } +void container_impl::client_connection_options(const connection_options &opts) { + client_connection_options_ = opts; +} + +void container_impl::server_connection_options(const connection_options &opts) { + server_connection_options_ = opts; +} + +void container_impl::configure_server_connection(connection &c) { +#ifdef PN_COMING_SOON + pn_acceptor_t *pnp = pn_connection_acceptor(pn_cast(&c)); + listener_context &lc(listener_context::get(pnp)); + class connection_options &opts(lc.connection_options); + if (opts.sasl_enabled()) { + sasl &s(c.transport().sasl()); + s.allow_insecure_mechs(opts.allow_insecure_mechs()); + if (opts.allowed_mechs()) + s.allowed_mechs(*opts.allowed_mechs()); + } + opts.apply(c); +#else + // Can't distinguish between multiple listeners yet. See PROTON-1054 + server_connection_options_.apply(c); +#endif +} + } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e61c86a/proton-c/bindings/cpp/src/container_impl.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/container_impl.hpp b/proton-c/bindings/cpp/src/container_impl.hpp index 2d97780..d9401d0 100644 --- a/proton-c/bindings/cpp/src/container_impl.hpp +++ b/proton-c/bindings/cpp/src/container_impl.hpp @@ -47,7 +47,7 @@ class container_impl public: PN_CPP_EXTERN container_impl(container&, handler *, const std::string& id); PN_CPP_EXTERN ~container_impl(); - PN_CPP_EXTERN connection connect(const url&, handler *h); + PN_CPP_EXTERN connection connect(const url&, const connection_options&); PN_CPP_EXTERN sender open_sender(connection &connection, const std::string &addr, handler *h); PN_CPP_EXTERN sender open_sender(const url&); PN_CPP_EXTERN receiver open_receiver(connection &connection, const std::string &addr, bool dynamic, handler *h); @@ -55,7 +55,12 @@ class container_impl PN_CPP_EXTERN class acceptor listen(const url&); PN_CPP_EXTERN duration timeout(); PN_CPP_EXTERN void timeout(duration timeout); + void client_connection_options(const connection_options &); + const connection_options& client_connection_options() { return client_connection_options_; } + void server_connection_options(const connection_options &); + const connection_options& server_connection_options() { return server_connection_options_; } + void configure_server_connection(connection &c); task schedule(int delay, handler *h); counted_ptr<pn_handler_t> cpp_handler(handler *h); @@ -71,6 +76,8 @@ class container_impl pn_unique_ptr<handler> flow_controller_; std::string id_; uint64_t link_id_; + connection_options client_connection_options_; + connection_options server_connection_options_; friend class container; }; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e61c86a/proton-c/bindings/cpp/src/reconnect_timer.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/reconnect_timer.cpp b/proton-c/bindings/cpp/src/reconnect_timer.cpp new file mode 100644 index 0000000..2a44063 --- /dev/null +++ b/proton-c/bindings/cpp/src/reconnect_timer.cpp @@ -0,0 +1,69 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "proton/reconnect_timer.hpp" +#include "proton/reactor.hpp" +#include "proton/error.hpp" +#include "msg.hpp" +#include "proton/types.h" +#include "proton/reactor.h" + +namespace proton { + +reconnect_timer::reconnect_timer(uint32_t first, int32_t max, uint32_t increment, + bool doubling, int32_t max_retries, int32_t timeout) : + first_delay_(first), max_delay_(max), increment_(increment), doubling_(doubling), + max_retries_(max_retries), timeout_(timeout), retries_(0), next_delay_(-1), timeout_deadline_(0), + reactor_(0) {} + +void reconnect_timer::reset() { + retries_ = 0; + next_delay_ = 0; + timeout_deadline_ = 0; +} + +int reconnect_timer::next_delay() { + retries_++; + if (max_retries_ >= 0 && retries_ > max_retries_) + return -1; + if (!reactor_) + throw error(MSG("reconnect timer missing reactor reference")); + pn_timestamp_t now = reactor_.now(); + + if (retries_ == 1) { + if (timeout_ >= 0) + timeout_deadline_ = now + timeout_; + next_delay_ = first_delay_; + } else if (retries_ == 2) { + next_delay_ += increment_; + } else { + next_delay_ += doubling_ ? next_delay_ : increment_; + } + if (timeout_deadline_ && now >= timeout_deadline_) + return -1; + if (max_delay_ >= 0 && next_delay_ > max_delay_) + next_delay_ = max_delay_; + if (timeout_deadline_ && (now + next_delay_ > timeout_deadline_)) + next_delay_ = timeout_deadline_ - now; + return next_delay_; +} + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e61c86a/proton-c/bindings/cpp/src/transport.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/transport.cpp b/proton-c/bindings/cpp/src/transport.cpp index 6e6e612..0574663 100644 --- a/proton-c/bindings/cpp/src/transport.cpp +++ b/proton-c/bindings/cpp/src/transport.cpp @@ -20,6 +20,7 @@ */ #include "proton/transport.hpp" #include "proton/connection.hpp" +#include "msg.hpp" #include "proton/transport.h" namespace proton { @@ -28,4 +29,39 @@ connection transport::connection() const { return pn_transport_connection(pn_object()); } +void transport::unbind() { + if (pn_transport_unbind(pn_object())) + throw error(MSG("transport::unbind failed " << pn_error_text(pn_transport_error(pn_object())))); +} + +void transport::bind(class connection &conn) { +// pn_connection_t *c = static_cast<pn_connection_t*>(conn.object_); + if (pn_transport_bind(pn_object(), conn.pn_object())) + throw error(MSG("transport::bind failed " << pn_error_text(pn_transport_error(pn_object())))); +} + +uint32_t transport::max_frame_size() const { + return pn_transport_get_max_frame(pn_object()); +} + +uint32_t transport::remote_max_frame_size() const { + return pn_transport_get_remote_max_frame(pn_object()); +} + +uint16_t transport::max_channels() const { + return pn_transport_get_channel_max(pn_object()); +} + +uint16_t transport::remote_max_channels() const { + return pn_transport_remote_channel_max(pn_object()); +} + +uint32_t transport::idle_timeout() const { + return pn_transport_get_idle_timeout(pn_object()); +} + +uint32_t transport::remote_idle_timeout() const { + return pn_transport_get_remote_idle_timeout(pn_object()); +} + } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
