PROTON-1566: [C++ binding] Reconnect - Implemented retry with exponential backoff
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/740b9509 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/740b9509 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/740b9509 Branch: refs/heads/go1 Commit: 740b95099f350980f7156821e65d9947147c80bc Parents: b60f093 Author: Andrew Stitcher <[email protected]> Authored: Thu Aug 31 10:47:36 2017 -0400 Committer: Andrew Stitcher <[email protected]> Committed: Thu Aug 31 10:54:31 2017 -0400 ---------------------------------------------------------------------- proton-c/bindings/cpp/CMakeLists.txt | 2 +- .../cpp/include/proton/connection_options.hpp | 16 ++- proton-c/bindings/cpp/include/proton/fwd.hpp | 2 +- .../cpp/include/proton/reconnect_options.hpp | 84 ++++++++++++++++ .../cpp/include/proton/reconnect_timer.hpp | 71 ------------- .../bindings/cpp/src/connection_options.cpp | 20 ++-- proton-c/bindings/cpp/src/contexts.cpp | 7 +- proton-c/bindings/cpp/src/include/contexts.hpp | 17 +++- .../cpp/src/include/proactor_container_impl.hpp | 11 ++ .../cpp/src/include/reconnect_options_impl.hpp | 41 ++++++++ .../cpp/src/proactor_container_impl.cpp | 100 +++++++++++++++++-- proton-c/bindings/cpp/src/reconnect_options.cpp | 43 ++++++++ proton-c/bindings/cpp/src/reconnect_timer.cpp | 64 ------------ 13 files changed, 312 insertions(+), 166 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/740b9509/proton-c/bindings/cpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/CMakeLists.txt b/proton-c/bindings/cpp/CMakeLists.txt index 472105a..330858a 100644 --- a/proton-c/bindings/cpp/CMakeLists.txt +++ b/proton-c/bindings/cpp/CMakeLists.txt @@ -56,7 +56,7 @@ set(qpid-proton-cpp-source src/proton_bits.cpp src/receiver.cpp src/receiver_options.cpp - src/reconnect_timer.cpp + src/reconnect_options.cpp src/returned.cpp src/sasl.cpp src/scalar_base.cpp http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/740b9509/proton-c/bindings/cpp/include/proton/connection_options.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/connection_options.hpp b/proton-c/bindings/cpp/include/proton/connection_options.hpp index 62af5f3..066e8cf 100644 --- a/proton-c/bindings/cpp/include/proton/connection_options.hpp +++ b/proton-c/bindings/cpp/include/proton/connection_options.hpp @@ -22,12 +22,12 @@ * */ +#include "./duration.hpp" #include "./fwd.hpp" -#include "./types_fwd.hpp" #include "./internal/config.hpp" #include "./internal/export.hpp" #include "./internal/pn_unique_ptr.hpp" -#include "./duration.hpp" +#include "./types_fwd.hpp" #include <proton/type_compat.h> @@ -123,13 +123,6 @@ class connection_options { /// container::listen. PN_CPP_EXTERN connection_options& password(const std::string&); - /// @cond INTERNAL - // XXX settle questions about reconnect_timer - consider simply - // reconnect_options and making reconnect_timer internal - /// **Experimental** - PN_CPP_EXTERN connection_options& reconnect(const reconnect_timer&); - /// @endcond - /// Set SSL client options. PN_CPP_EXTERN connection_options& ssl_client_options(const class ssl_client_options&); @@ -153,6 +146,11 @@ class connection_options { /// **Unsettled API** - Set the SASL configuration path. PN_CPP_EXTERN connection_options& sasl_config_path(const std::string&); + /// **Experimental** - Options for reconnect on outgoing connections. + PN_CPP_EXTERN connection_options& reconnect(reconnect_options &); + + + /// Update option values from values set in other. PN_CPP_EXTERN connection_options& update(const connection_options& other); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/740b9509/proton-c/bindings/cpp/include/proton/fwd.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/fwd.hpp b/proton-c/bindings/cpp/include/proton/fwd.hpp index efbb91b..a394579 100644 --- a/proton-c/bindings/cpp/include/proton/fwd.hpp +++ b/proton-c/bindings/cpp/include/proton/fwd.hpp @@ -40,7 +40,7 @@ class listener; class receiver; class receiver_iterator; class receiver_options; -class reconnect_timer; +class reconnect_options; class sasl; class sender; class sender_iterator; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/740b9509/proton-c/bindings/cpp/include/proton/reconnect_options.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/reconnect_options.hpp b/proton-c/bindings/cpp/include/proton/reconnect_options.hpp new file mode 100644 index 0000000..e8ed02c --- /dev/null +++ b/proton-c/bindings/cpp/include/proton/reconnect_options.hpp @@ -0,0 +1,84 @@ +#ifndef PROTON_RECONNECT_OPTIONS_HPP +#define PROTON_RECONNECT_OPTIONS_HPP + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "./internal/export.hpp" +#include "./internal/pn_unique_ptr.hpp" +#include "./duration.hpp" +#include "./source.hpp" + +#include <string> + +namespace proton { + +/// **Experimental** - Options that determine a series of delays to +/// coordinate reconnection attempts. They may be open ended or +/// limited in time. They may be evenly spaced or increasing at an +/// exponential rate. +/// +/// Options can be "chained" (@see proton::connection_options). +/// +/// Normal value semantics: copy or assign creates a separate copy of +/// the options. +class reconnect_options { + public: + + /// Create an empty set of options. + PN_CPP_EXTERN reconnect_options(); + + /// Copy options. + PN_CPP_EXTERN reconnect_options(const reconnect_options&); + + PN_CPP_EXTERN ~reconnect_options(); + + /// Copy options. + PN_CPP_EXTERN reconnect_options& operator=(const reconnect_options&); + + /// Base value for recurring delay (default is 10 milliseconds). + PN_CPP_EXTERN reconnect_options& delay(duration); + + /// Scaling multiplier for successive reconnect delays (default is 2.0) + PN_CPP_EXTERN reconnect_options& delay_multiplier(float); + + /// Maximum delay between successive connect attempts (default + /// duration::FOREVER, i.e. no limit) + PN_CPP_EXTERN reconnect_options& max_delay(duration); + + /// Maximum reconnect attempts (default 0, meaning no limit) + PN_CPP_EXTERN reconnect_options& max_attempts(int); + + /// TODO: failover_urls + + + private: + class impl; + internal::pn_unique_ptr<impl> impl_; + + /// @cond INTERNAL + friend class container; + /// @endcond +}; + +} // proton + +#endif // PROTON_RECONNECT_OPTIONS_HPP http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/740b9509/proton-c/bindings/cpp/include/proton/reconnect_timer.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/reconnect_timer.hpp b/proton-c/bindings/cpp/include/proton/reconnect_timer.hpp deleted file mode 100644 index 766feb7..0000000 --- a/proton-c/bindings/cpp/include/proton/reconnect_timer.hpp +++ /dev/null @@ -1,71 +0,0 @@ -#ifndef PROTON_RECONNECT_TIMER_HPP -#define PROTON_RECONNECT_TIMER_HPP - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -/// @cond INTERNAL -/// XXX Needs more discussion - -#include "./internal/export.hpp" -#include "./duration.hpp" -#include "./timestamp.hpp" - -#include <proton/type_compat.h> - -namespace proton { - -/// **Experimental** - A class that generates a series of delays to -/// coordinate reconnection attempts. They may be open ended or -/// limited in time. They may be evenly spaced or doubling at an -/// exponential rate. -class reconnect_timer { - public: - PN_CPP_EXTERN reconnect_timer(uint32_t first = 0, int32_t max = -1, uint32_t increment = 100, - bool doubling = true, int32_t max_retries = -1, int32_t timeout = -1); - - /// Indicate a successful connection, resetting the internal timer - /// values. - PN_CPP_EXTERN void reset(); - - /// Obtain the timer's computed time to delay before attempting a - /// reconnection attempt (in milliseconds). -1 means that the - /// retry limit or timeout has been exceeded and reconnection - /// attempts should cease. - PN_CPP_EXTERN int next_delay(timestamp now); - - private: - duration first_delay_; - duration max_delay_; - duration increment_; - bool doubling_; - int32_t max_retries_; - duration timeout_; - int32_t retries_; - duration next_delay_; - timestamp timeout_deadline_; -}; - -} // proton - -/// @endcond - -#endif // PROTON_RECONNECT_TIMER_HPP http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/740b9509/proton-c/bindings/cpp/src/connection_options.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/connection_options.cpp b/proton-c/bindings/cpp/src/connection_options.cpp index 0848c73..ff97764 100644 --- a/proton-c/bindings/cpp/src/connection_options.cpp +++ b/proton-c/bindings/cpp/src/connection_options.cpp @@ -18,11 +18,12 @@ * under the License. * */ -#include "proton/fwd.hpp" -#include "proton/connection.hpp" #include "proton/connection_options.hpp" + +#include "proton/connection.hpp" +#include "proton/fwd.hpp" #include "proton/messaging_handler.hpp" -#include "proton/reconnect_timer.hpp" +#include "proton/reconnect_options.hpp" #include "proton/transport.hpp" #include "proton/ssl.hpp" #include "proton/sasl.hpp" @@ -57,7 +58,7 @@ class connection_options::impl { option<std::string> virtual_host; option<std::string> user; option<std::string> password; - option<reconnect_timer> reconnect; + option<reconnect_options> reconnect; option<class ssl_client_options> ssl_client_options; option<class ssl_server_options> ssl_server_options; option<bool> sasl_enabled; @@ -73,16 +74,15 @@ class connection_options::impl { * transport options (set once per transport over the life of the * connection). */ - void apply_unbound(connection& c) { + void apply_unbound(connection& c, const connection_options& co) { pn_connection_t *pnc = unwrap(c); // Only apply connection options if uninit. bool uninit = c.uninitialized(); if (!uninit) return; - bool outbound = !connection_context::get(pnc).listener_context_; - if (reconnect.set && outbound) - connection_context::get(pnc).reconnect.reset(new reconnect_timer(reconnect.value)); + if (reconnect.set) + connection_context::get(pnc).reconnect_context_.reset(new reconnect_context(reconnect.value, co)); if (container_id.set) pn_connection_set_container(pnc, container_id.value.c_str()); if (virtual_host.set) @@ -187,7 +187,7 @@ connection_options& connection_options::container_id(const std::string &id) { im connection_options& connection_options::virtual_host(const std::string &id) { impl_->virtual_host = id; return *this; } connection_options& connection_options::user(const std::string &user) { impl_->user = user; return *this; } connection_options& connection_options::password(const std::string &password) { impl_->password = password; return *this; } -connection_options& connection_options::reconnect(const reconnect_timer &rc) { impl_->reconnect = rc; return *this; } +connection_options& connection_options::reconnect(reconnect_options &r) { impl_->reconnect = r; return *this; } connection_options& connection_options::ssl_client_options(const class ssl_client_options &c) { impl_->ssl_client_options = c; return *this; } connection_options& connection_options::ssl_server_options(const class ssl_server_options &c) { impl_->ssl_server_options = c; return *this; } connection_options& connection_options::sasl_enabled(bool b) { impl_->sasl_enabled = b; return *this; } @@ -196,7 +196,7 @@ connection_options& connection_options::sasl_allowed_mechs(const std::string &s) connection_options& connection_options::sasl_config_name(const std::string &n) { impl_->sasl_config_name = n; return *this; } connection_options& connection_options::sasl_config_path(const std::string &p) { impl_->sasl_config_path = p; return *this; } -void connection_options::apply_unbound(connection& c) const { impl_->apply_unbound(c); } +void connection_options::apply_unbound(connection& c) const { impl_->apply_unbound(c, *this); } void connection_options::apply_bound(connection& c) const { impl_->apply_bound(c); } messaging_handler* connection_options::handler() const { return impl_->handler.value; } } // namespace proton http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/740b9509/proton-c/bindings/cpp/src/contexts.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/contexts.cpp b/proton-c/bindings/cpp/src/contexts.cpp index 152828a..812d573 100644 --- a/proton-c/bindings/cpp/src/contexts.cpp +++ b/proton-c/bindings/cpp/src/contexts.cpp @@ -20,18 +20,19 @@ */ #include "contexts.hpp" + #include "msg.hpp" #include "proton_bits.hpp" #include "proton/connection_options.hpp" #include "proton/error.hpp" +#include "proton/reconnect_options.hpp" #include <proton/connection.h> #include <proton/object.h> #include <proton/link.h> #include <proton/listener.h> #include <proton/message.h> -#include "proton/reconnect_timer.hpp" #include <proton/session.h> #include <typeinfo> @@ -70,6 +71,10 @@ connection_context::connection_context() : container(0), default_session(0), link_gen(0), handler(0), listener_context_(0) {} +reconnect_context::reconnect_context(const reconnect_options& ro, const connection_options& co) : + reconnect_options_(new reconnect_options(ro)), connection_options_(new connection_options(co)), retries_(0) +{} + listener_context::listener_context() : listen_handler_(0) {} connection_context& connection_context::get(pn_connection_t *c) { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/740b9509/proton-c/bindings/cpp/src/include/contexts.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/include/contexts.hpp b/proton-c/bindings/cpp/src/include/contexts.hpp index 0c829db..7920d70 100644 --- a/proton-c/bindings/cpp/src/include/contexts.hpp +++ b/proton-c/bindings/cpp/src/include/contexts.hpp @@ -35,7 +35,7 @@ struct pn_listener_t; namespace proton { class proton_handler; -class reconnect_timer; +class connector; namespace io {class link_namer;} @@ -77,6 +77,7 @@ class context { }; class listener_context; +class reconnect_context; // Connection context used by all connections. class connection_context : public context { @@ -90,11 +91,23 @@ class connection_context : public context { io::link_namer* link_gen; // Link name generator. messaging_handler* handler; - internal::pn_unique_ptr<reconnect_timer> reconnect; + std::string connected_address_; + internal::pn_unique_ptr<reconnect_context> reconnect_context_; listener_context* listener_context_; work_queue work_queue_; }; +// This is not a context object on its own, but an optional part of connection +class reconnect_context { + public: + reconnect_context(const reconnect_options& ro, const connection_options& co); + + internal::pn_unique_ptr<const reconnect_options> reconnect_options_; + internal::pn_unique_ptr<const connection_options> connection_options_; + duration delay_; + int retries_; +}; + class listener_context : public context { public: listener_context(); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/740b9509/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp b/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp index 0aa62a5..804908a 100644 --- a/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp +++ b/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp @@ -64,6 +64,10 @@ struct pn_event_t; namespace proton { +namespace internal { +class connector; +} + class container::impl { public: impl(container& c, const std::string& id, messaging_handler* = 0); @@ -99,7 +103,12 @@ class container::impl { class container_work_queue; pn_listener_t* listen_common_lh(const std::string&); pn_connection_t* make_connection_lh(const url& url, const connection_options&); + void setup_connection_lh(const url& url, pn_connection_t *pnc); void start_connection(const url& url, pn_connection_t* c); + void reconnect(pn_connection_t* pnc); + duration next_delay(reconnect_context& rc); + bool setup_reconnect(pn_connection_t* pnc); + void reset_reconnect(pn_connection_t* pnc); // Event loop to run in each container thread void thread(); @@ -136,9 +145,11 @@ class container::impl { proton::sender_options sender_options_; proton::receiver_options receiver_options_; error_condition disconnect_error_; + int retries_; bool auto_stop_; bool stopping_; + friend class connector; }; template <class T> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/740b9509/proton-c/bindings/cpp/src/include/reconnect_options_impl.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/include/reconnect_options_impl.hpp b/proton-c/bindings/cpp/src/include/reconnect_options_impl.hpp new file mode 100644 index 0000000..fc90508 --- /dev/null +++ b/proton-c/bindings/cpp/src/include/reconnect_options_impl.hpp @@ -0,0 +1,41 @@ +#ifndef PROTON_CPP_RECONNECT_OPTIONSIMPL_H +#define PROTON_CPP_RECONNECT_OPTIONSIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "proton/duration.hpp" + +namespace proton { + +class reconnect_options::impl { + public: + impl() : delay(10), delay_multiplier(2.0), max_delay(duration::FOREVER), max_attempts(0) {} + + duration delay; + float delay_multiplier; + duration max_delay; + int max_attempts; +}; + +} + +#endif /*!PROTON_CPP_RECONNECT_OPTIONSIMPL_H*/ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/740b9509/proton-c/bindings/cpp/src/proactor_container_impl.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/proactor_container_impl.cpp b/proton-c/bindings/cpp/src/proactor_container_impl.cpp index 9870210..ff4d4bb 100644 --- a/proton-c/bindings/cpp/src/proactor_container_impl.cpp +++ b/proton-c/bindings/cpp/src/proactor_container_impl.cpp @@ -24,6 +24,7 @@ #include "proton/function.hpp" #include "proton/listener.hpp" #include "proton/listen_handler.hpp" +#include "proton/reconnect_options.hpp" #include "proton/url.hpp" #include "proton/connection.h" @@ -33,6 +34,7 @@ #include "contexts.hpp" #include "messaging_adapter.hpp" +#include "reconnect_options_impl.hpp" #include "proton_bits.hpp" #include <assert.h> @@ -152,6 +154,15 @@ void container::impl::remove_work_queue(container::impl::container_work_queue* l work_queues_.erase(l); } +void container::impl::setup_connection_lh(const url& url, pn_connection_t *pnc) { + pn_connection_set_container(pnc, id_.c_str()); + pn_connection_set_hostname(pnc, url.host().c_str()); + if (!url.user().empty()) + pn_connection_set_user(pnc, url.user().c_str()); + if (!url.password().empty()) + pn_connection_set_password(pnc, url.password().c_str()); +} + pn_connection_t* container::impl::make_connection_lh( const url& url, const connection_options& user_opts) @@ -169,14 +180,10 @@ pn_connection_t* container::impl::make_connection_lh( cc.handler = mh; cc.work_queue_ = new container::impl::connection_work_queue(*container_.impl_, pnc); - pn_connection_set_container(pnc, id_.c_str()); - pn_connection_set_hostname(pnc, url.host().c_str()); - if (!url.user().empty()) - pn_connection_set_user(pnc, url.user().c_str()); - if (!url.password().empty()) - pn_connection_set_password(pnc, url.password().c_str()); - + cc.connected_address_ = url; + setup_connection_lh(url, pnc); make_wrapper(pnc).open(opts); + return pnc; // 1 refcount from pn_connection() } @@ -186,6 +193,70 @@ void container::impl::start_connection(const url& url, pn_connection_t *pnc) { pn_proactor_connect(proactor_, pnc, caddr); // Takes ownership of pnc } +void container::impl::reconnect(pn_connection_t* pnc) { + connection_context& cc = connection_context::get(pnc); + reconnect_context* rc = cc.reconnect_context_.get(); + + // Figure out next connection url to try + const proton::url url(cc.connected_address_); + + cc.connected_address_ = url; + setup_connection_lh(url, pnc); + make_wrapper(pnc).open(*rc->connection_options_); + start_connection(cc.connected_address_, pnc); + rc->retries_++; +} + +duration container::impl::next_delay(reconnect_context& rc) { + // If we've not retried before do it immediately + if (rc.retries_==0) return duration(0); + + const reconnect_options::impl& roi = *rc.reconnect_options_->impl_; + if (rc.retries_==1) { + rc.delay_ = roi.delay; + } else { + rc.delay_ = std::min(roi.max_delay, rc.delay_ * roi.delay_multiplier); + } + return rc.delay_; +} + +void container::impl::reset_reconnect(pn_connection_t* pnc) { + connection_context& cc = connection_context::get(pnc); + reconnect_context* rc = cc.reconnect_context_.get(); + + if (rc) rc->retries_ = 0; +} + +bool container::impl::setup_reconnect(pn_connection_t* pnc) { + connection_context& cc = connection_context::get(pnc); + reconnect_context* rc = cc.reconnect_context_.get(); + + // If reconnect not enabled just fail + if (!rc) return false; + + const reconnect_options::impl& roi = *rc->reconnect_options_->impl_; + + // If too many reconnect attempts just fail + if ( roi.max_attempts != 0 && rc->retries_ >= roi.max_attempts) { + pn_transport_t* t = pn_connection_transport(pnc); + pn_condition_t* condition = pn_transport_condition(t); + pn_condition_format(condition, "proton:io", "Too many reconnect attempts (%d)", rc->retries_); + return false; + } + + // Recover connection from proactor + pn_proactor_release_connection(pnc); + + // Figure out delay till next reconnect + duration delay = next_delay(*rc); + + // Schedule reconnect - can do this on container work queue as no one can have the connection + // now anyway + schedule(delay, make_work(&container::impl::reconnect, this, pnc)); + + return true; +} + returned<connection> container::impl::connect( const std::string& addr, const proton::connection_options& user_opts) @@ -417,6 +488,21 @@ bool container::impl::handle(pn_event_t* event) { return false; } + case PN_CONNECTION_REMOTE_OPEN: { + // This is the only event that we get indicating that the connection succeeded so + // it's the only place to reset the reconnection logic. + // + // Just note we have a connection then process normally + pn_connection_t* c = pn_event_connection(event); + reset_reconnect(c); + break; + } + case PN_TRANSPORT_CLOSED: { + // If reconnect is turned on then handle closed on error here with reconnect attempt + pn_connection_t* c = pn_event_connection(event); + pn_transport_t* t = pn_event_transport(event); + if (pn_condition_is_set(pn_transport_condition(t)) && setup_reconnect(c)) return false; + } default: break; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/740b9509/proton-c/bindings/cpp/src/reconnect_options.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/reconnect_options.cpp b/proton-c/bindings/cpp/src/reconnect_options.cpp new file mode 100644 index 0000000..ef0d497 --- /dev/null +++ b/proton-c/bindings/cpp/src/reconnect_options.cpp @@ -0,0 +1,43 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "proton/reconnect_options.hpp" +#include "reconnect_options_impl.hpp" + +namespace proton { + +reconnect_options::reconnect_options() : impl_(new impl()) {} +reconnect_options::reconnect_options(const reconnect_options& x) : impl_(new impl()) { + *this = x; +} +reconnect_options::~reconnect_options() {} + +reconnect_options& reconnect_options::operator=(const reconnect_options& x) { + *impl_ = *x.impl_; + return *this; +} + +reconnect_options& reconnect_options::delay(duration d) { impl_->delay = d; return *this; } +reconnect_options& reconnect_options::delay_multiplier(float f) { impl_->delay_multiplier = f; return *this; } +reconnect_options& reconnect_options::max_delay(duration d) { impl_->max_delay = d; return *this; } +reconnect_options& reconnect_options::max_attempts(int i) { impl_->max_attempts = i; return *this; } + +} // namespace proton http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/740b9509/proton-c/bindings/cpp/src/reconnect_timer.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/reconnect_timer.cpp b/proton-c/bindings/cpp/src/reconnect_timer.cpp deleted file mode 100644 index a299b0e..0000000 --- a/proton-c/bindings/cpp/src/reconnect_timer.cpp +++ /dev/null @@ -1,64 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "proton/reconnect_timer.hpp" -#include "proton/error.hpp" -#include "msg.hpp" -#include <proton/types.h> - -namespace proton { - -reconnect_timer::reconnect_timer(uint32_t first, int32_t max, uint32_t increment, - bool doubling, int32_t max_retries, int32_t timeout) : - first_delay_(first), max_delay_(max), increment_(increment), doubling_(doubling), - max_retries_(max_retries), timeout_(timeout), retries_(0), next_delay_(-1), timeout_deadline_(0) - {} - -void reconnect_timer::reset() { - retries_ = 0; - next_delay_ = 0; - timeout_deadline_ = 0; -} - -int reconnect_timer::next_delay(timestamp now) { - retries_++; - if (max_retries_ >= 0 && retries_ > max_retries_) - return -1; - - if (retries_ == 1) { - if (timeout_ >= duration(0)) - timeout_deadline_ = now + timeout_; - next_delay_ = first_delay_; - } else if (retries_ == 2) { - next_delay_ = next_delay_ + increment_; - } else { - next_delay_ = next_delay_ + ( doubling_ ? next_delay_ : increment_ ); - } - if (timeout_deadline_ != timestamp(0) && now >= timeout_deadline_) - return -1; - if (max_delay_ >= duration(0) && next_delay_ > max_delay_) - next_delay_ = max_delay_; - if (timeout_deadline_ != timestamp(0) && (now + next_delay_ > timeout_deadline_)) - next_delay_ = timeout_deadline_ - now; - return next_delay_.milliseconds(); -} - -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
