PROTON-1566: [C++ binding] Reconnect
- Implemented retry with exponential backoff


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/740b9509
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/740b9509
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/740b9509

Branch: refs/heads/go1
Commit: 740b95099f350980f7156821e65d9947147c80bc
Parents: b60f093
Author: Andrew Stitcher <[email protected]>
Authored: Thu Aug 31 10:47:36 2017 -0400
Committer: Andrew Stitcher <[email protected]>
Committed: Thu Aug 31 10:54:31 2017 -0400

----------------------------------------------------------------------
 proton-c/bindings/cpp/CMakeLists.txt            |   2 +-
 .../cpp/include/proton/connection_options.hpp   |  16 ++-
 proton-c/bindings/cpp/include/proton/fwd.hpp    |   2 +-
 .../cpp/include/proton/reconnect_options.hpp    |  84 ++++++++++++++++
 .../cpp/include/proton/reconnect_timer.hpp      |  71 -------------
 .../bindings/cpp/src/connection_options.cpp     |  20 ++--
 proton-c/bindings/cpp/src/contexts.cpp          |   7 +-
 proton-c/bindings/cpp/src/include/contexts.hpp  |  17 +++-
 .../cpp/src/include/proactor_container_impl.hpp |  11 ++
 .../cpp/src/include/reconnect_options_impl.hpp  |  41 ++++++++
 .../cpp/src/proactor_container_impl.cpp         | 100 +++++++++++++++++--
 proton-c/bindings/cpp/src/reconnect_options.cpp |  43 ++++++++
 proton-c/bindings/cpp/src/reconnect_timer.cpp   |  64 ------------
 13 files changed, 312 insertions(+), 166 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/740b9509/proton-c/bindings/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/CMakeLists.txt 
b/proton-c/bindings/cpp/CMakeLists.txt
index 472105a..330858a 100644
--- a/proton-c/bindings/cpp/CMakeLists.txt
+++ b/proton-c/bindings/cpp/CMakeLists.txt
@@ -56,7 +56,7 @@ set(qpid-proton-cpp-source
   src/proton_bits.cpp
   src/receiver.cpp
   src/receiver_options.cpp
-  src/reconnect_timer.cpp
+  src/reconnect_options.cpp
   src/returned.cpp
   src/sasl.cpp
   src/scalar_base.cpp

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/740b9509/proton-c/bindings/cpp/include/proton/connection_options.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/connection_options.hpp 
b/proton-c/bindings/cpp/include/proton/connection_options.hpp
index 62af5f3..066e8cf 100644
--- a/proton-c/bindings/cpp/include/proton/connection_options.hpp
+++ b/proton-c/bindings/cpp/include/proton/connection_options.hpp
@@ -22,12 +22,12 @@
  *
  */
 
+#include "./duration.hpp"
 #include "./fwd.hpp"
-#include "./types_fwd.hpp"
 #include "./internal/config.hpp"
 #include "./internal/export.hpp"
 #include "./internal/pn_unique_ptr.hpp"
-#include "./duration.hpp"
+#include "./types_fwd.hpp"
 
 #include <proton/type_compat.h>
 
@@ -123,13 +123,6 @@ class connection_options {
     /// container::listen.
     PN_CPP_EXTERN connection_options& password(const std::string&);
 
-    /// @cond INTERNAL
-    // XXX settle questions about reconnect_timer - consider simply
-    // reconnect_options and making reconnect_timer internal
-    /// **Experimental**
-    PN_CPP_EXTERN connection_options& reconnect(const reconnect_timer&);
-    /// @endcond
-
     /// Set SSL client options.
     PN_CPP_EXTERN connection_options& ssl_client_options(const class 
ssl_client_options&);
 
@@ -153,6 +146,11 @@ class connection_options {
     /// **Unsettled API** - Set the SASL configuration path.
     PN_CPP_EXTERN connection_options& sasl_config_path(const std::string&);
 
+    /// **Experimental** - Options for reconnect on outgoing connections.
+    PN_CPP_EXTERN connection_options& reconnect(reconnect_options &);
+
+    
+
     /// Update option values from values set in other.
     PN_CPP_EXTERN connection_options& update(const connection_options& other);
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/740b9509/proton-c/bindings/cpp/include/proton/fwd.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/fwd.hpp 
b/proton-c/bindings/cpp/include/proton/fwd.hpp
index efbb91b..a394579 100644
--- a/proton-c/bindings/cpp/include/proton/fwd.hpp
+++ b/proton-c/bindings/cpp/include/proton/fwd.hpp
@@ -40,7 +40,7 @@ class listener;
 class receiver;
 class receiver_iterator;
 class receiver_options;
-class reconnect_timer;
+class reconnect_options;
 class sasl;
 class sender;
 class sender_iterator;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/740b9509/proton-c/bindings/cpp/include/proton/reconnect_options.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/reconnect_options.hpp 
b/proton-c/bindings/cpp/include/proton/reconnect_options.hpp
new file mode 100644
index 0000000..e8ed02c
--- /dev/null
+++ b/proton-c/bindings/cpp/include/proton/reconnect_options.hpp
@@ -0,0 +1,84 @@
+#ifndef PROTON_RECONNECT_OPTIONS_HPP
+#define PROTON_RECONNECT_OPTIONS_HPP
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "./internal/export.hpp"
+#include "./internal/pn_unique_ptr.hpp"
+#include "./duration.hpp"
+#include "./source.hpp"
+
+#include <string>
+
+namespace proton {
+
+/// **Experimental** - Options that determine a series of delays to
+/// coordinate reconnection attempts.  They may be open ended or
+/// limited in time.  They may be evenly spaced or increasing at an
+/// exponential rate.
+///
+/// Options can be "chained" (@see proton::connection_options).
+///
+/// Normal value semantics: copy or assign creates a separate copy of
+/// the options.
+class reconnect_options {
+  public:
+
+    /// Create an empty set of options.
+    PN_CPP_EXTERN reconnect_options();
+
+    /// Copy options.
+    PN_CPP_EXTERN reconnect_options(const reconnect_options&);
+
+    PN_CPP_EXTERN ~reconnect_options();
+
+    /// Copy options.
+    PN_CPP_EXTERN reconnect_options& operator=(const reconnect_options&);
+
+    /// Base value for recurring delay (default is 10 milliseconds).
+    PN_CPP_EXTERN reconnect_options& delay(duration);
+
+    /// Scaling multiplier for successive reconnect delays (default is 2.0)
+    PN_CPP_EXTERN reconnect_options& delay_multiplier(float);
+
+    /// Maximum delay between successive connect attempts (default
+    /// duration::FOREVER, i.e. no limit)
+    PN_CPP_EXTERN reconnect_options& max_delay(duration);
+
+    /// Maximum reconnect attempts (default 0, meaning no limit)
+    PN_CPP_EXTERN reconnect_options& max_attempts(int);
+
+    /// TODO: failover_urls
+
+
+  private:
+    class impl;
+    internal::pn_unique_ptr<impl> impl_;
+
+    /// @cond INTERNAL
+  friend class container;
+      /// @endcond
+};
+
+} // proton
+
+#endif // PROTON_RECONNECT_OPTIONS_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/740b9509/proton-c/bindings/cpp/include/proton/reconnect_timer.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/reconnect_timer.hpp 
b/proton-c/bindings/cpp/include/proton/reconnect_timer.hpp
deleted file mode 100644
index 766feb7..0000000
--- a/proton-c/bindings/cpp/include/proton/reconnect_timer.hpp
+++ /dev/null
@@ -1,71 +0,0 @@
-#ifndef PROTON_RECONNECT_TIMER_HPP
-#define PROTON_RECONNECT_TIMER_HPP
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-/// @cond INTERNAL
-/// XXX Needs more discussion
-
-#include "./internal/export.hpp"
-#include "./duration.hpp"
-#include "./timestamp.hpp"
-
-#include <proton/type_compat.h>
-
-namespace proton {
-
-/// **Experimental** - A class that generates a series of delays to
-/// coordinate reconnection attempts.  They may be open ended or
-/// limited in time.  They may be evenly spaced or doubling at an
-/// exponential rate.
-class reconnect_timer {
-  public:
-    PN_CPP_EXTERN reconnect_timer(uint32_t first = 0, int32_t max = -1, 
uint32_t increment = 100,
-                                  bool doubling = true, int32_t max_retries = 
-1, int32_t timeout = -1);
-
-    /// Indicate a successful connection, resetting the internal timer
-    /// values.
-    PN_CPP_EXTERN void reset();
-
-    /// Obtain the timer's computed time to delay before attempting a
-    /// reconnection attempt (in milliseconds).  -1 means that the
-    /// retry limit or timeout has been exceeded and reconnection
-    /// attempts should cease.
-    PN_CPP_EXTERN int next_delay(timestamp now);
-
-  private:
-    duration first_delay_;
-    duration max_delay_;
-    duration increment_;
-    bool doubling_;
-    int32_t max_retries_;
-    duration timeout_;
-    int32_t retries_;
-    duration next_delay_;
-    timestamp timeout_deadline_;
-};
-
-} // proton
-
-/// @endcond
-
-#endif // PROTON_RECONNECT_TIMER_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/740b9509/proton-c/bindings/cpp/src/connection_options.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/connection_options.cpp 
b/proton-c/bindings/cpp/src/connection_options.cpp
index 0848c73..ff97764 100644
--- a/proton-c/bindings/cpp/src/connection_options.cpp
+++ b/proton-c/bindings/cpp/src/connection_options.cpp
@@ -18,11 +18,12 @@
  * under the License.
  *
  */
-#include "proton/fwd.hpp"
-#include "proton/connection.hpp"
 #include "proton/connection_options.hpp"
+
+#include "proton/connection.hpp"
+#include "proton/fwd.hpp"
 #include "proton/messaging_handler.hpp"
-#include "proton/reconnect_timer.hpp"
+#include "proton/reconnect_options.hpp"
 #include "proton/transport.hpp"
 #include "proton/ssl.hpp"
 #include "proton/sasl.hpp"
@@ -57,7 +58,7 @@ class connection_options::impl {
     option<std::string> virtual_host;
     option<std::string> user;
     option<std::string> password;
-    option<reconnect_timer> reconnect;
+    option<reconnect_options> reconnect;
     option<class ssl_client_options> ssl_client_options;
     option<class ssl_server_options> ssl_server_options;
     option<bool> sasl_enabled;
@@ -73,16 +74,15 @@ class connection_options::impl {
      * transport options (set once per transport over the life of the
      * connection).
      */
-    void apply_unbound(connection& c) {
+    void apply_unbound(connection& c, const connection_options& co) {
         pn_connection_t *pnc = unwrap(c);
 
         // Only apply connection options if uninit.
         bool uninit = c.uninitialized();
         if (!uninit) return;
 
-        bool outbound = !connection_context::get(pnc).listener_context_;
-        if (reconnect.set && outbound)
-            connection_context::get(pnc).reconnect.reset(new 
reconnect_timer(reconnect.value));
+        if (reconnect.set)
+            connection_context::get(pnc).reconnect_context_.reset(new 
reconnect_context(reconnect.value, co));
         if (container_id.set)
             pn_connection_set_container(pnc, container_id.value.c_str());
         if (virtual_host.set)
@@ -187,7 +187,7 @@ connection_options& connection_options::container_id(const 
std::string &id) { im
 connection_options& connection_options::virtual_host(const std::string &id) { 
impl_->virtual_host = id; return *this; }
 connection_options& connection_options::user(const std::string &user) { 
impl_->user = user; return *this; }
 connection_options& connection_options::password(const std::string &password) 
{ impl_->password = password; return *this; }
-connection_options& connection_options::reconnect(const reconnect_timer &rc) { 
impl_->reconnect = rc; return *this; }
+connection_options& connection_options::reconnect(reconnect_options &r) { 
impl_->reconnect = r; return *this; }
 connection_options& connection_options::ssl_client_options(const class 
ssl_client_options &c) { impl_->ssl_client_options = c; return *this; }
 connection_options& connection_options::ssl_server_options(const class 
ssl_server_options &c) { impl_->ssl_server_options = c; return *this; }
 connection_options& connection_options::sasl_enabled(bool b) { 
impl_->sasl_enabled = b; return *this; }
@@ -196,7 +196,7 @@ connection_options& 
connection_options::sasl_allowed_mechs(const std::string &s)
 connection_options& connection_options::sasl_config_name(const std::string &n) 
{ impl_->sasl_config_name = n; return *this; }
 connection_options& connection_options::sasl_config_path(const std::string &p) 
{ impl_->sasl_config_path = p; return *this; }
 
-void connection_options::apply_unbound(connection& c) const { 
impl_->apply_unbound(c); }
+void connection_options::apply_unbound(connection& c) const { 
impl_->apply_unbound(c, *this); }
 void connection_options::apply_bound(connection& c) const { 
impl_->apply_bound(c); }
 messaging_handler* connection_options::handler() const { return 
impl_->handler.value; }
 } // namespace proton

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/740b9509/proton-c/bindings/cpp/src/contexts.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/contexts.cpp 
b/proton-c/bindings/cpp/src/contexts.cpp
index 152828a..812d573 100644
--- a/proton-c/bindings/cpp/src/contexts.cpp
+++ b/proton-c/bindings/cpp/src/contexts.cpp
@@ -20,18 +20,19 @@
  */
 
 #include "contexts.hpp"
+
 #include "msg.hpp"
 #include "proton_bits.hpp"
 
 #include "proton/connection_options.hpp"
 #include "proton/error.hpp"
+#include "proton/reconnect_options.hpp"
 
 #include <proton/connection.h>
 #include <proton/object.h>
 #include <proton/link.h>
 #include <proton/listener.h>
 #include <proton/message.h>
-#include "proton/reconnect_timer.hpp"
 #include <proton/session.h>
 
 #include <typeinfo>
@@ -70,6 +71,10 @@ connection_context::connection_context() :
     container(0), default_session(0), link_gen(0), handler(0), 
listener_context_(0)
 {}
 
+reconnect_context::reconnect_context(const reconnect_options& ro, const 
connection_options& co) :
+    reconnect_options_(new reconnect_options(ro)), connection_options_(new 
connection_options(co)), retries_(0)
+{}
+
 listener_context::listener_context() : listen_handler_(0) {}
 
 connection_context& connection_context::get(pn_connection_t *c) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/740b9509/proton-c/bindings/cpp/src/include/contexts.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/include/contexts.hpp 
b/proton-c/bindings/cpp/src/include/contexts.hpp
index 0c829db..7920d70 100644
--- a/proton-c/bindings/cpp/src/include/contexts.hpp
+++ b/proton-c/bindings/cpp/src/include/contexts.hpp
@@ -35,7 +35,7 @@ struct pn_listener_t;
 namespace proton {
 
 class proton_handler;
-class reconnect_timer;
+class connector;
 
 namespace io {class link_namer;}
 
@@ -77,6 +77,7 @@ class context {
 };
 
 class listener_context;
+class reconnect_context;
 
 // Connection context used by all connections.
 class connection_context : public context {
@@ -90,11 +91,23 @@ class connection_context : public context {
     io::link_namer* link_gen;      // Link name generator.
 
     messaging_handler* handler;
-    internal::pn_unique_ptr<reconnect_timer> reconnect;
+    std::string connected_address_;
+    internal::pn_unique_ptr<reconnect_context> reconnect_context_;
     listener_context* listener_context_;
     work_queue work_queue_;
 };
 
+// This is not a context object on its own, but an optional part of connection
+class reconnect_context {
+  public:
+    reconnect_context(const reconnect_options& ro, const connection_options& 
co);
+
+    internal::pn_unique_ptr<const reconnect_options> reconnect_options_;
+    internal::pn_unique_ptr<const connection_options> connection_options_;
+    duration delay_;
+    int retries_;
+};
+
 class listener_context : public context {
   public:
     listener_context();

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/740b9509/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp 
b/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp
index 0aa62a5..804908a 100644
--- a/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp
+++ b/proton-c/bindings/cpp/src/include/proactor_container_impl.hpp
@@ -64,6 +64,10 @@ struct pn_event_t;
 
 namespace proton {
 
+namespace internal {
+class connector;
+}
+
 class container::impl {
   public:
     impl(container& c, const std::string& id, messaging_handler* = 0);
@@ -99,7 +103,12 @@ class container::impl {
     class container_work_queue;
     pn_listener_t* listen_common_lh(const std::string&);
     pn_connection_t* make_connection_lh(const url& url, const 
connection_options&);
+    void setup_connection_lh(const url& url, pn_connection_t *pnc);
     void start_connection(const url& url, pn_connection_t* c);
+    void reconnect(pn_connection_t* pnc);
+    duration next_delay(reconnect_context& rc);
+    bool setup_reconnect(pn_connection_t* pnc);
+    void reset_reconnect(pn_connection_t* pnc);
 
     // Event loop to run in each container thread
     void thread();
@@ -136,9 +145,11 @@ class container::impl {
     proton::sender_options sender_options_;
     proton::receiver_options receiver_options_;
     error_condition disconnect_error_;
+    int retries_;
 
     bool auto_stop_;
     bool stopping_;
+    friend class connector;
 };
 
 template <class T>

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/740b9509/proton-c/bindings/cpp/src/include/reconnect_options_impl.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/include/reconnect_options_impl.hpp 
b/proton-c/bindings/cpp/src/include/reconnect_options_impl.hpp
new file mode 100644
index 0000000..fc90508
--- /dev/null
+++ b/proton-c/bindings/cpp/src/include/reconnect_options_impl.hpp
@@ -0,0 +1,41 @@
+#ifndef PROTON_CPP_RECONNECT_OPTIONSIMPL_H
+#define PROTON_CPP_RECONNECT_OPTIONSIMPL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "proton/duration.hpp"
+
+namespace proton {
+
+class reconnect_options::impl {
+  public:
+    impl() : delay(10), delay_multiplier(2.0), max_delay(duration::FOREVER), 
max_attempts(0) {}
+
+    duration delay;
+    float    delay_multiplier;
+    duration max_delay;
+    int      max_attempts;
+};
+
+}
+
+#endif  /*!PROTON_CPP_RECONNECT_OPTIONSIMPL_H*/

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/740b9509/proton-c/bindings/cpp/src/proactor_container_impl.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/proactor_container_impl.cpp 
b/proton-c/bindings/cpp/src/proactor_container_impl.cpp
index 9870210..ff4d4bb 100644
--- a/proton-c/bindings/cpp/src/proactor_container_impl.cpp
+++ b/proton-c/bindings/cpp/src/proactor_container_impl.cpp
@@ -24,6 +24,7 @@
 #include "proton/function.hpp"
 #include "proton/listener.hpp"
 #include "proton/listen_handler.hpp"
+#include "proton/reconnect_options.hpp"
 #include "proton/url.hpp"
 
 #include "proton/connection.h"
@@ -33,6 +34,7 @@
 
 #include "contexts.hpp"
 #include "messaging_adapter.hpp"
+#include "reconnect_options_impl.hpp"
 #include "proton_bits.hpp"
 
 #include <assert.h>
@@ -152,6 +154,15 @@ void 
container::impl::remove_work_queue(container::impl::container_work_queue* l
     work_queues_.erase(l);
 }
 
+void container::impl::setup_connection_lh(const url& url, pn_connection_t 
*pnc) {
+    pn_connection_set_container(pnc, id_.c_str());
+    pn_connection_set_hostname(pnc, url.host().c_str());
+    if (!url.user().empty())
+        pn_connection_set_user(pnc, url.user().c_str());
+    if (!url.password().empty())
+        pn_connection_set_password(pnc, url.password().c_str());
+}
+
 pn_connection_t* container::impl::make_connection_lh(
     const url& url,
     const connection_options& user_opts)
@@ -169,14 +180,10 @@ pn_connection_t* container::impl::make_connection_lh(
     cc.handler = mh;
     cc.work_queue_ = new 
container::impl::connection_work_queue(*container_.impl_, pnc);
 
-    pn_connection_set_container(pnc, id_.c_str());
-    pn_connection_set_hostname(pnc, url.host().c_str());
-    if (!url.user().empty())
-        pn_connection_set_user(pnc, url.user().c_str());
-    if (!url.password().empty())
-        pn_connection_set_password(pnc, url.password().c_str());
-
+    cc.connected_address_ = url;
+    setup_connection_lh(url, pnc);
     make_wrapper(pnc).open(opts);
+
     return pnc;                 // 1 refcount from pn_connection()
 }
 
@@ -186,6 +193,70 @@ void container::impl::start_connection(const url& url, 
pn_connection_t *pnc) {
     pn_proactor_connect(proactor_, pnc, caddr); // Takes ownership of pnc
 }
 
+void container::impl::reconnect(pn_connection_t* pnc) {
+    connection_context& cc = connection_context::get(pnc);
+    reconnect_context* rc = cc.reconnect_context_.get();
+
+    // Figure out next connection url to try
+    const proton::url url(cc.connected_address_);
+
+    cc.connected_address_ = url;
+    setup_connection_lh(url, pnc);
+    make_wrapper(pnc).open(*rc->connection_options_);
+    start_connection(cc.connected_address_, pnc);
+    rc->retries_++;
+}
+
+duration container::impl::next_delay(reconnect_context& rc) {
+    // If we've not retried before do it immediately
+    if (rc.retries_==0) return duration(0);
+
+    const reconnect_options::impl& roi = *rc.reconnect_options_->impl_;
+    if (rc.retries_==1) {
+        rc.delay_ = roi.delay;
+    } else {
+        rc.delay_ = std::min(roi.max_delay, rc.delay_ * roi.delay_multiplier);
+    }
+    return rc.delay_;
+}
+
+void container::impl::reset_reconnect(pn_connection_t* pnc) {
+    connection_context& cc = connection_context::get(pnc);
+    reconnect_context* rc = cc.reconnect_context_.get();
+
+    if (rc) rc->retries_ = 0;
+}
+
+bool container::impl::setup_reconnect(pn_connection_t* pnc) {
+    connection_context& cc = connection_context::get(pnc);
+    reconnect_context* rc = cc.reconnect_context_.get();
+
+    // If reconnect not enabled just fail
+    if (!rc) return false;
+
+    const reconnect_options::impl& roi = *rc->reconnect_options_->impl_;
+
+    // If too many reconnect attempts just fail
+    if ( roi.max_attempts != 0 && rc->retries_ >= roi.max_attempts) {
+        pn_transport_t* t = pn_connection_transport(pnc);
+        pn_condition_t* condition = pn_transport_condition(t);
+        pn_condition_format(condition, "proton:io", "Too many reconnect 
attempts (%d)", rc->retries_);
+        return false;
+    }
+
+    // Recover connection from proactor
+    pn_proactor_release_connection(pnc);
+
+    // Figure out delay till next reconnect
+    duration delay = next_delay(*rc);
+
+    // Schedule reconnect - can do this on container work queue as no one can 
have the connection
+    // now anyway
+    schedule(delay, make_work(&container::impl::reconnect, this, pnc));
+
+    return true;
+}
+
 returned<connection> container::impl::connect(
     const std::string& addr,
     const proton::connection_options& user_opts)
@@ -417,6 +488,21 @@ bool container::impl::handle(pn_event_t* event) {
 
         return false;
     }
+    case PN_CONNECTION_REMOTE_OPEN: {
+        // This is the only event that we get indicating that the connection 
succeeded so
+        // it's the only place to reset the reconnection logic.
+        //
+        // Just note we have a connection then process normally
+        pn_connection_t* c = pn_event_connection(event);
+        reset_reconnect(c);
+        break;
+    }
+    case PN_TRANSPORT_CLOSED: {
+        // If reconnect is turned on then handle closed on error here with 
reconnect attempt
+        pn_connection_t* c = pn_event_connection(event);
+        pn_transport_t* t = pn_event_transport(event);
+        if (pn_condition_is_set(pn_transport_condition(t)) && 
setup_reconnect(c)) return false;
+    }
     default:
         break;
     }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/740b9509/proton-c/bindings/cpp/src/reconnect_options.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/reconnect_options.cpp 
b/proton-c/bindings/cpp/src/reconnect_options.cpp
new file mode 100644
index 0000000..ef0d497
--- /dev/null
+++ b/proton-c/bindings/cpp/src/reconnect_options.cpp
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "proton/reconnect_options.hpp"
+#include "reconnect_options_impl.hpp"
+
+namespace proton {
+
+reconnect_options::reconnect_options() : impl_(new impl()) {}
+reconnect_options::reconnect_options(const reconnect_options& x) : impl_(new 
impl()) {
+    *this = x;
+}
+reconnect_options::~reconnect_options() {}
+
+reconnect_options& reconnect_options::operator=(const reconnect_options& x) {
+    *impl_ = *x.impl_;
+    return *this;
+}
+
+reconnect_options& reconnect_options::delay(duration d) { impl_->delay = d; 
return *this; }
+reconnect_options& reconnect_options::delay_multiplier(float f) { 
impl_->delay_multiplier = f; return *this; }
+reconnect_options& reconnect_options::max_delay(duration d) { impl_->max_delay 
= d; return *this; }
+reconnect_options& reconnect_options::max_attempts(int i) { 
impl_->max_attempts = i; return *this; }
+
+} // namespace proton

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/740b9509/proton-c/bindings/cpp/src/reconnect_timer.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/reconnect_timer.cpp 
b/proton-c/bindings/cpp/src/reconnect_timer.cpp
deleted file mode 100644
index a299b0e..0000000
--- a/proton-c/bindings/cpp/src/reconnect_timer.cpp
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "proton/reconnect_timer.hpp"
-#include "proton/error.hpp"
-#include "msg.hpp"
-#include <proton/types.h>
-
-namespace proton {
-
-reconnect_timer::reconnect_timer(uint32_t first, int32_t max, uint32_t 
increment,
-                                 bool doubling, int32_t max_retries, int32_t 
timeout) :
-    first_delay_(first), max_delay_(max), increment_(increment), 
doubling_(doubling),
-    max_retries_(max_retries), timeout_(timeout), retries_(0), 
next_delay_(-1), timeout_deadline_(0)
-    {}
-
-void reconnect_timer::reset() {
-    retries_ = 0;
-    next_delay_ = 0;
-    timeout_deadline_ = 0;
-}
-
-int reconnect_timer::next_delay(timestamp now) {
-    retries_++;
-    if (max_retries_ >= 0 && retries_ > max_retries_)
-        return -1;
-
-    if (retries_ == 1) {
-        if (timeout_ >= duration(0))
-            timeout_deadline_ = now + timeout_;
-        next_delay_ = first_delay_;
-    } else if (retries_ == 2) {
-        next_delay_ = next_delay_ + increment_;
-    } else {
-        next_delay_ = next_delay_ + ( doubling_ ? next_delay_ : increment_ );
-    }
-    if (timeout_deadline_ != timestamp(0) && now >= timeout_deadline_)
-        return -1;
-    if (max_delay_ >= duration(0) && next_delay_ > max_delay_)
-        next_delay_ = max_delay_;
-    if (timeout_deadline_ != timestamp(0) && (now + next_delay_ > 
timeout_deadline_))
-        next_delay_ = timeout_deadline_ - now;
-    return next_delay_.milliseconds();
-}
-
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to