Repository: qpid-proton
Updated Branches:
  refs/heads/master 587f3cd8f -> 8e61c86ac


PROTON-1066: connection options and reconnect for C++ binding


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/8e61c86a
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/8e61c86a
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/8e61c86a

Branch: refs/heads/master
Commit: 8e61c86ac47f3613ab3f28dfdb4aca0db95482fb
Parents: 587f3cd
Author: Clifford Jansen <[email protected]>
Authored: Wed Nov 25 16:10:45 2015 -0800
Committer: Clifford Jansen <[email protected]>
Committed: Wed Nov 25 16:14:21 2015 -0800

----------------------------------------------------------------------
 examples/cpp/CMakeLists.txt                     |   1 +
 examples/cpp/connection_options.cpp             |  73 +++++++++++
 proton-c/bindings/cpp/CMakeLists.txt            |   2 +
 .../bindings/cpp/include/proton/connection.hpp  |  10 +-
 .../cpp/include/proton/connection_options.hpp   | 104 ++++++++++++++++
 .../bindings/cpp/include/proton/container.hpp   |  16 ++-
 .../cpp/include/proton/reconnect_timer.hpp      |  63 ++++++++++
 .../bindings/cpp/include/proton/transport.hpp   |  13 +-
 .../cpp/src/blocking_connection_impl.cpp        |   3 +-
 proton-c/bindings/cpp/src/connection.cpp        |  13 +-
 .../bindings/cpp/src/connection_options.cpp     | 124 +++++++++++++++++++
 proton-c/bindings/cpp/src/connector.cpp         |  67 +++++++++-
 proton-c/bindings/cpp/src/connector.hpp         |  15 ++-
 proton-c/bindings/cpp/src/container.cpp         |  14 ++-
 proton-c/bindings/cpp/src/container_impl.cpp    |  77 ++++++++++--
 proton-c/bindings/cpp/src/container_impl.hpp    |   9 +-
 proton-c/bindings/cpp/src/reconnect_timer.cpp   |  69 +++++++++++
 proton-c/bindings/cpp/src/transport.cpp         |  36 ++++++
 18 files changed, 680 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e61c86a/examples/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/cpp/CMakeLists.txt b/examples/cpp/CMakeLists.txt
index 8916963..8890e2d 100644
--- a/examples/cpp/CMakeLists.txt
+++ b/examples/cpp/CMakeLists.txt
@@ -35,6 +35,7 @@ set(examples
   server
   server_direct
   recurring_timer
+  connection_options
   encode_decode)
 
 if (NOT WIN32)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e61c86a/examples/cpp/connection_options.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/connection_options.cpp 
b/examples/cpp/connection_options.cpp
new file mode 100644
index 0000000..da3c4d9
--- /dev/null
+++ b/examples/cpp/connection_options.cpp
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "proton/container.hpp"
+#include "proton/messaging_handler.hpp"
+#include "proton/url.hpp"
+#include "proton/transport.hpp"
+
+#include <iostream>
+
+using proton::connection_options;
+
+class handler_2 : public proton::messaging_handler {
+    void on_connection_opened(proton::event &e) {
+        std::cout << "connection events going to handler_2" << std::endl;
+        std::cout << "connection max_frame_size: " << 
e.connection().transport().max_frame_size() <<
+            ", idle timeout: " << e.connection().transport().idle_timeout() << 
std::endl;
+        e.connection().close();
+    }
+};
+
+class main_handler : public proton::messaging_handler {
+  private:
+    proton::url url;
+    handler_2 conn_handler;
+
+  public:
+    main_handler(const proton::url& u) : url(u) {}
+
+    void on_start(proton::event &e) {
+        // Connection options for this connection.  Merged with and overriding 
the container's
+        // client_connection_options() settings. 
+        e.container().connect(url, 
connection_options().handler(&conn_handler).max_frame_size(2468));
+    }
+
+    void on_connection_opened(proton::event &e) {
+        std::cout << "unexpected connection event on main handler" << 
std::endl;
+        e.connection().close();
+    }
+};
+
+int main(int argc, char **argv) {
+    try {
+        std::string url = argc > 1 ? argv[1] : "127.0.0.1:5672/examples";
+        main_handler handler(url);
+        proton::container container(handler);
+        // Global connection options for future connections on container.
+        
container.client_connection_options(connection_options().max_frame_size(12345).idle_timeout(15000));
+        container.run();
+        return 0;
+    } catch (const std::exception& e) {
+        std::cerr << e.what() << std::endl;
+    }
+    return 1;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e61c86a/proton-c/bindings/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/CMakeLists.txt 
b/proton-c/bindings/cpp/CMakeLists.txt
index bf9af46..47ca937 100644
--- a/proton-c/bindings/cpp/CMakeLists.txt
+++ b/proton-c/bindings/cpp/CMakeLists.txt
@@ -33,6 +33,7 @@ set(qpid-proton-cpp-source
   src/blocking_receiver.cpp
   src/blocking_sender.cpp
   src/connection.cpp
+  src/connection_options.cpp
   src/connector.cpp
   src/container.cpp
   src/container_impl.cpp
@@ -59,6 +60,7 @@ set(qpid-proton-cpp-source
   src/proton_handler.cpp
   src/reactor.cpp
   src/receiver.cpp
+  src/reconnect_timer.cpp
   src/request_response.cpp
   src/sender.cpp
   src/session.cpp

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e61c86a/proton-c/bindings/cpp/include/proton/connection.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/connection.hpp 
b/proton-c/bindings/cpp/include/proton/connection.hpp
index b662b9c..bb438ed 100644
--- a/proton-c/bindings/cpp/include/proton/connection.hpp
+++ b/proton-c/bindings/cpp/include/proton/connection.hpp
@@ -26,6 +26,7 @@
 #include "proton/link.hpp"
 #include "proton/object.hpp"
 #include "proton/session.hpp"
+#include "proton/connection_options.hpp"
 #include "proton/types.h"
 #include <string>
 
@@ -52,9 +53,12 @@ class connection : public object<pn_connection_t>, endpoint
     /// Get the container, throw an exception if event_loop is not a container.
     PN_CPP_EXTERN class container &container() const;
 
-    /// Get the engine, , throw an exception if event_loop is not an engine.
+    /// Get the engine, throw an exception if event_loop is not an engine.
     PN_CPP_EXTERN class engine &engine() const;
 
+    /// Get the transport for the connection.
+    PN_CPP_EXTERN class transport transport() const;
+
     /// Return the AMQP host name for the connection.
     PN_CPP_EXTERN std::string host() const;
 
@@ -102,6 +106,10 @@ class connection : public object<pn_connection_t>, endpoint
 
     /** Get the endpoint state */
     PN_CPP_EXTERN endpoint::state state() const;
+
+    friend class connection_options;
+    friend class connector;
+    friend class transport;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e61c86a/proton-c/bindings/cpp/include/proton/connection_options.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/connection_options.hpp 
b/proton-c/bindings/cpp/include/proton/connection_options.hpp
new file mode 100644
index 0000000..2755adf
--- /dev/null
+++ b/proton-c/bindings/cpp/include/proton/connection_options.hpp
@@ -0,0 +1,104 @@
+#ifndef PROTON_CPP_CONNECTION_OPTIONS_H
+#define PROTON_CPP_CONNECTION_OPTIONS_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "proton/config.hpp"
+#include "proton/export.hpp"
+#include "proton/pn_unique_ptr.hpp"
+#include "proton/reconnect_timer.hpp"
+#include "proton/types.hpp"
+//#include "proton/ssl.hpp"
+
+#include <vector>
+#include <string>
+
+namespace proton {
+
+class handler;
+class connection;
+
+/** Options for creating a connection.
+ *
+ * Options can be "chained" like this:
+ *
+ * c = container.connect(url, 
connection_options().handler(h).max_frame_size(1234));
+ *
+ * You can also create an options object with common settings and use it as a 
base
+ * for different connections that have mostly the same settings:
+ *
+ * connection_options opts;
+ * opts.idle_timeout(1000).max_frame_size(10000);
+ * c1 = container.connect(url1, opts.handler(h1));
+ * c2 = container.connect(url2, opts.handler(h2));
+ *
+ * Normal value semantics, copy or assign creates a separate copy of the 
options.
+ */
+class connection_options {
+  public:
+    PN_CPP_EXTERN connection_options();
+    PN_CPP_EXTERN connection_options(const connection_options&);
+    PN_CPP_EXTERN ~connection_options();
+    PN_CPP_EXTERN connection_options& operator=(const connection_options&);
+
+    /// Override with options from other.
+    PN_CPP_EXTERN void override(const connection_options& other);
+
+    // TODO: Document options
+    
+    PN_CPP_EXTERN connection_options& handler(class handler *);
+    PN_CPP_EXTERN connection_options& max_frame_size(uint32_t max);
+    PN_CPP_EXTERN connection_options& max_channels(uint16_t max);
+    PN_CPP_EXTERN connection_options& idle_timeout(uint32_t t);
+    PN_CPP_EXTERN connection_options& heartbeat(uint32_t t);
+    PN_CPP_EXTERN connection_options& container_id(const std::string &id);
+    PN_CPP_EXTERN connection_options& reconnect(const reconnect_timer &);
+#ifdef PN_CPP_SOON
+    PN_CPP_EXTERN connection_options& client_domain(const class client_domain 
&);
+    PN_CPP_EXTERN connection_options& server_domain(const class server_domain 
&);
+    PN_CPP_EXTERN connection_options& peer_hostname(const std::string &name);
+    PN_CPP_EXTERN connection_options& resume_id(const std::string &id);
+    PN_CPP_EXTERN connection_options& sasl_enabled(bool);
+    PN_CPP_EXTERN connection_options& allow_insecure_mechs(bool);
+    PN_CPP_EXTERN connection_options& allowed_mechs(const std::string &);
+#endif
+  private:
+    void apply(connection&) const;
+    class handler* handler() const;
+    static pn_connection_t *pn_connection(connection &);
+#ifdef PN_CPP_SOON
+    bool sasl_enabled() const;
+    bool allow_insecure_mechs() const;
+    std::string *allowed_mechs() const;
+    class client_domain &client_domain();
+    class server_domain &server_domain();
+#endif
+
+    class impl;
+    pn_unique_ptr<impl> impl_;
+
+  friend class container_impl;
+  friend class connector;
+};
+
+} // namespace
+
+#endif  /*!PROTON_CPP_CONNECTION_OPTIONS_H*/

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e61c86a/proton-c/bindings/cpp/include/proton/container.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/container.hpp 
b/proton-c/bindings/cpp/include/proton/container.hpp
index 984d37a..2cebcd4 100644
--- a/proton-c/bindings/cpp/include/proton/container.hpp
+++ b/proton-c/bindings/cpp/include/proton/container.hpp
@@ -27,6 +27,7 @@
 #include "proton/pn_unique_ptr.hpp"
 #include "proton/reactor.hpp"
 #include "proton/url.hpp"
+#include "proton/connection_options.hpp"
 
 #include <string>
 
@@ -59,10 +60,10 @@ class container : public event_loop {
     PN_CPP_EXTERN ~container();
 
     /** Locally open a connection @see connection::open  */
-    PN_CPP_EXTERN connection connect(const proton::url&, handler *h=0);
+    PN_CPP_EXTERN connection connect(const proton::url&, const 
connection_options &opts = connection_options());
 
     /** Open a connection to url and create a receiver with source=url.path() 
*/
-    PN_CPP_EXTERN acceptor listen(const proton::url &);
+    PN_CPP_EXTERN acceptor listen(const proton::url&, const connection_options 
&opts = connection_options());
 
     /** Run the event loop, return when all connections and acceptors are 
closed. */
     PN_CPP_EXTERN void run();
@@ -82,6 +83,17 @@ class container : public event_loop {
     // Schedule a timer task event in delay milliseconds.
     PN_CPP_EXTERN task schedule(int delay, handler *h = 0);
 
+    /** Copy the connection options to a template which will be
+        applied to subsequent outgoing connections.  These are applied first
+        and overriden by additional connection options provided in
+        other methods */
+    PN_CPP_EXTERN void client_connection_options(const connection_options &);
+
+    /** Copy the connection options to a template which will be
+        applied to incoming connections.  These are applied before the
+        first open event on the connection. */
+    PN_CPP_EXTERN void server_connection_options(const connection_options &);
+
   private:
     pn_unique_ptr<container_impl> impl_;
 };

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e61c86a/proton-c/bindings/cpp/include/proton/reconnect_timer.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/reconnect_timer.hpp 
b/proton-c/bindings/cpp/include/proton/reconnect_timer.hpp
new file mode 100644
index 0000000..f89f47e
--- /dev/null
+++ b/proton-c/bindings/cpp/include/proton/reconnect_timer.hpp
@@ -0,0 +1,63 @@
+#ifndef PROTON_CPP_RECONNECT_H
+#define PROTON_CPP_RECONNECT_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "proton/export.hpp"
+#include "proton/types.hpp"
+#include "proton/reactor.hpp"
+#include <string>
+
+namespace proton {
+
+/** A class that generates a series of delays to coordinate reconnection 
attempts.  They may be open ended or limited in time.  They may be evenly 
spaced or doubling at an exponential rate. */
+class reconnect_timer
+{
+  public:
+    /** TODO:
+     */
+    PN_CPP_EXTERN reconnect_timer(uint32_t first = 0, int32_t max = -1, 
uint32_t increment = 100,
+                                  bool doubling = true, int32_t max_retries = 
-1, int32_t timeout = -1);
+
+    /** Indicate a successful connection, resetting the internal timer values 
*/
+    PN_CPP_EXTERN void reset();
+
+    /** Obtain the timer's computed time to delay before attempting a 
reconnection attempt (in milliseconds).  -1 means that the retry limit or 
timeout has been exceeded and reconnection attempts should cease. */
+    PN_CPP_EXTERN int next_delay();
+
+  private:
+    int32_t first_delay_;
+    int32_t max_delay_;
+    int32_t increment_;
+    bool doubling_;
+    int32_t max_retries_;
+    int32_t timeout_;
+    int32_t retries_;
+    int32_t next_delay_;
+    pn_timestamp_t timeout_deadline_;
+    reactor reactor_;
+    friend class connector;
+};
+
+}
+
+#endif  /*!PROTON_CPP_RECONNECT_H*/

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e61c86a/proton-c/bindings/cpp/include/proton/transport.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/transport.hpp 
b/proton-c/bindings/cpp/include/proton/transport.hpp
index d60786a..0b73a8c 100644
--- a/proton-c/bindings/cpp/include/proton/transport.hpp
+++ b/proton-c/bindings/cpp/include/proton/transport.hpp
@@ -23,7 +23,7 @@
  */
 
 #include "proton/object.hpp"
-
+#include "proton/types.hpp"
 #include "proton/export.hpp"
 
 struct pn_transport_t;
@@ -38,7 +38,16 @@ class transport : public object<pn_transport_t>
   public:
     transport(pn_transport_t* t) : object<pn_transport_t>(t) {}
 
-    class connection connection() const;
+    PN_CPP_EXTERN class connection connection() const;
+    PN_CPP_EXTERN void unbind();
+    PN_CPP_EXTERN void bind(class connection &);
+    PN_CPP_EXTERN uint32_t max_frame_size() const;
+    PN_CPP_EXTERN uint32_t remote_max_frame_size() const;
+    PN_CPP_EXTERN uint16_t max_channels() const;
+    PN_CPP_EXTERN uint16_t remote_max_channels() const;
+    PN_CPP_EXTERN uint32_t idle_timeout() const;
+    PN_CPP_EXTERN uint32_t remote_idle_timeout() const;
+    friend class connection_options;
 };
 
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e61c86a/proton-c/bindings/cpp/src/blocking_connection_impl.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/blocking_connection_impl.cpp 
b/proton-c/bindings/cpp/src/blocking_connection_impl.cpp
index 65ae9bb..7a5d882 100644
--- a/proton-c/bindings/cpp/src/blocking_connection_impl.cpp
+++ b/proton-c/bindings/cpp/src/blocking_connection_impl.cpp
@@ -50,7 +50,8 @@ blocking_connection_impl::blocking_connection_impl(const url& 
url, duration time
 {
     container_->reactor().start();
     container_->reactor().timeout(timeout);
-    connection_ = container_->connect(url, this); // Set this as handler.
+    handler *h = static_cast<handler*>(this); // Set this as handler.
+    connection_ = container_->connect(url, connection_options().handler(h));
     wait(connection_opening(connection_));
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e61c86a/proton-c/bindings/cpp/src/connection.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/connection.cpp 
b/proton-c/bindings/cpp/src/connection.cpp
index 11cc29b..5226a12 100644
--- a/proton-c/bindings/cpp/src/connection.cpp
+++ b/proton-c/bindings/cpp/src/connection.cpp
@@ -24,6 +24,7 @@
 #include "proton/handler.hpp"
 #include "proton/session.hpp"
 #include "proton/error.hpp"
+#include "connector.hpp"
 
 #include "msg.hpp"
 #include "contexts.hpp"
@@ -39,7 +40,17 @@ namespace proton {
 
 connection_context& connection::context() const { return 
connection_context::get(pn_object()); }
 
-void connection::open() { pn_connection_open(pn_object()); }
+transport connection::transport() const {
+    return pn_connection_transport(pn_object());
+}
+
+void connection::open() {
+    connector *connector = dynamic_cast<class 
connector*>(context().handler.get());
+    if (connector)
+        connector->apply_options();
+    // Inbound connections should already be configured.
+    pn_connection_open(pn_object());
+}
 
 void connection::close() { pn_connection_close(pn_object()); }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e61c86a/proton-c/bindings/cpp/src/connection_options.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/connection_options.cpp 
b/proton-c/bindings/cpp/src/connection_options.cpp
new file mode 100644
index 0000000..6d5b829
--- /dev/null
+++ b/proton-c/bindings/cpp/src/connection_options.cpp
@@ -0,0 +1,124 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "proton/connection_options.hpp"
+#include "proton/reconnect_timer.hpp"
+#include "proton/transport.hpp"
+#include "contexts.hpp"
+#include "connector.hpp"
+#include "msg.hpp"
+
+#include "proton/transport.h"
+
+namespace proton {
+
+template <class T> struct option {
+    T value;
+    bool set;
+
+    option() : value(), set(false) {}
+    option& operator=(const T& x) { value = x;  set = true; return *this; }
+    void override(const option<T>& x) { if (x.set) *this = x.value; }
+};
+
+class connection_options::impl {
+  public:
+    option<class handler*> handler;
+    option<uint32_t> max_frame_size;
+    option<uint16_t> max_channels;
+    option<uint32_t> idle_timeout;
+    option<uint32_t> heartbeat;
+    option<std::string> container_id;
+    option<reconnect_timer> reconnect;
+#ifdef PN_CCP_SOON
+    option<class client_domain> client_domain;
+    option<class server_domain> server_domain;
+    option<std::string> peer_hostname;
+    option<std::string> resume_id;
+    option<bool> sasl_enabled;
+    option<std::string> allowed_mechs;
+    option<bool> allow_insecure_mechs;
+#endif
+
+    void apply(connection& c) {
+        pn_connection_t *pnc = connection_options::pn_connection(c);
+        pn_transport_t *pnt = pn_connection_transport(pnc);
+        connector *outbound = 
dynamic_cast<connector*>(c.context().handler.get());
+        bool uninit = (c.state() & endpoint::LOCAL_UNINIT);
+
+        // pnt is NULL between reconnect attempts.
+        // Only apply transport options if uninit or outbound with
+        // transport not yet configured.
+        if (pnt && (uninit || (outbound && !outbound->transport_configured())))
+        {
+            if (max_frame_size.set)
+                pn_transport_set_max_frame(pnt, max_frame_size.value);
+            if (max_channels.set)
+                pn_transport_set_channel_max(pnt, max_channels.value);
+            if (idle_timeout.set)
+                pn_transport_set_idle_timeout(pnt, idle_timeout.value);
+        }
+        // Only apply connection options if uninit.
+        if (uninit) {
+            if (reconnect.set && outbound)
+                outbound->reconnect_timer(reconnect.value);
+            if (container_id.set)
+                pn_connection_set_container(pnc, container_id.value.c_str());
+        }
+    }
+
+    void override(const impl& x) {
+        handler.override(x.handler);
+        max_frame_size.override(x.max_frame_size);
+        max_channels.override(x.max_channels);
+        idle_timeout.override(x.idle_timeout);
+        heartbeat.override(x.heartbeat);
+        container_id.override(x.container_id);
+        reconnect.override(x.reconnect);
+    }
+
+};
+
+connection_options::connection_options() : impl_(new impl()) {}
+connection_options::connection_options(const connection_options& x) : 
impl_(new impl()) {
+    *this = x;
+}
+connection_options::~connection_options() {}
+
+connection_options& connection_options::operator=(const connection_options& x) 
{
+    *impl_ = *x.impl_;
+    return *this;
+}
+
+void connection_options::override(const connection_options& x) { 
impl_->override(*x.impl_); }
+
+connection_options& connection_options::handler(class handler *h) { 
impl_->handler = h; return *this; }
+connection_options& connection_options::max_frame_size(uint32_t n) { 
impl_->max_frame_size = n; return *this; }
+connection_options& connection_options::max_channels(uint16_t n) { 
impl_->max_frame_size = n; return *this; }
+connection_options& connection_options::idle_timeout(uint32_t t) { 
impl_->idle_timeout = t; return *this; }
+connection_options& connection_options::heartbeat(uint32_t t) { 
impl_->heartbeat = t; return *this; }
+connection_options& connection_options::container_id(const std::string &id) { 
impl_->container_id = id; return *this; }
+connection_options& connection_options::reconnect(const reconnect_timer &rc) { 
impl_->reconnect = rc; return *this; }
+
+void connection_options::apply(connection& c) const { impl_->apply(c); }
+handler* connection_options::handler() const { return impl_->handler.value; }
+
+pn_connection_t* connection_options::pn_connection(connection &c) { return 
c.pn_object(); }
+} // namespace proton

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e61c86a/proton-c/bindings/cpp/src/connector.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/connector.cpp 
b/proton-c/bindings/cpp/src/connector.cpp
index a8614cd..2547ff5 100644
--- a/proton-c/bindings/cpp/src/connector.cpp
+++ b/proton-c/bindings/cpp/src/connector.cpp
@@ -19,43 +19,98 @@
  *
  */
 
+#include "connector.hpp"
 #include "proton/connection.hpp"
 #include "proton/transport.hpp"
 #include "proton/container.hpp"
 #include "proton/event.hpp"
-#include "proton/connection.h"
 #include "proton/url.hpp"
+#include "proton/reconnect_timer.hpp"
+#include "proton/task.hpp"
+#include "container_impl.hpp"
 
-#include "connector.hpp"
+#include "proton/connection.h"
+#include "proton/transport.h"
 
 namespace proton {
 
-connector::connector(connection &c) : connection_(c) {}
+connector::connector(connection&c, const connection_options &opts) :
+    connection_(c), options_(opts), reconnect_timer_(0), 
transport_configured_(false)
+{}
 
-connector::~connector() {}
+connector::~connector() { delete reconnect_timer_; }
 
 void connector::address(const url &a) {
     address_ = a;
 }
 
+void connector::apply_options() {
+    if (!connection_) return;
+    options_.apply(connection_);
+}
+
+bool connector::transport_configured() { return transport_configured_; }
+
+void connector::reconnect_timer(const class reconnect_timer &rt) {
+    delete reconnect_timer_;
+    reconnect_timer_ = new class reconnect_timer(rt);
+    reconnect_timer_->reactor_ = connection_.container().reactor();
+}
+
 void connector::connect() {
     connection_.container_id(connection_.container().id());
     connection_.host(address_.host_port());
+    transport t(pn_transport());
+    t.bind(connection_);
+    // Apply options to the new transport.
+    options_.apply(connection_);
+    transport_configured_ = true;
 }
 
 void connector::on_connection_local_open(event &e) {
     connect();
 }
 
-void connector::on_connection_remote_open(event &e) {}
+void connector::on_connection_remote_open(event &e) {
+    if (reconnect_timer_) {
+        reconnect_timer_->reset();
+    }
+}
 
 void connector::on_connection_init(event &e) {
 }
 
+void connector::on_transport_tail_closed(event &e) {
+    on_transport_closed(e);
+}
+
 void connector::on_transport_closed(event &e) {
-    // TODO: prepend with reconnect logic
+    if (!connection_) return;
+    if (connection_.state() & endpoint::LOCAL_ACTIVE) {
+        if (reconnect_timer_) {
+            e.connection().transport().unbind();
+            transport_configured_ = false;
+            int delay = reconnect_timer_->next_delay();
+            if (delay >= 0) {
+                if (delay == 0) {
+                    // log "Disconnected, reconnecting..."
+                    connect();
+                    return;
+                }
+                else {
+                    // log "Disconnected, reconnecting in " <<  delay << " 
milliseconds"
+                    connection_.container().schedule(delay, this);
+                    return;
+                }
+            }
+        }
+    }
     connection_.release();
     connection_  = 0;
 }
 
+void connector::on_timer_task(event &e) {
+    connect();
+}
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e61c86a/proton-c/bindings/cpp/src/connector.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/connector.hpp 
b/proton-c/bindings/cpp/src/connector.hpp
index 818282a..458e2ba 100644
--- a/proton-c/bindings/cpp/src/connector.hpp
+++ b/proton-c/bindings/cpp/src/connector.hpp
@@ -23,9 +23,10 @@
  */
 
 #include "proton/proton_handler.hpp"
+#include "proton/connection_options.hpp"
+#include "proton/url.hpp"
 #include "proton/event.h"
 #include "proton/reactor.h"
-#include "proton/url.h"
 #include <string>
 
 
@@ -34,22 +35,32 @@ namespace proton {
 class event;
 class connection;
 class transport;
+class reconnect_timer;
 
 class connector : public proton_handler
 {
   public:
-    connector(connection &c);
+    connector(connection &c, const connection_options &opts);
     ~connector();
     void address(const url&);
+    const url &address() const { return address_; }
     void connect();
+    void apply_options();
+    void reconnect_timer(const class reconnect_timer &);
+    bool transport_configured();
     virtual void on_connection_local_open(event &e);
     virtual void on_connection_remote_open(event &e);
     virtual void on_connection_init(event &e);
     virtual void on_transport_closed(event &e);
+    virtual void on_transport_tail_closed(event &e);
+    virtual void on_timer_task(event &e);
 
   private:
     connection connection_;
     url address_;
+    connection_options options_;
+    class reconnect_timer *reconnect_timer_;
+    bool transport_configured_;
 };
 
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e61c86a/proton-c/bindings/cpp/src/container.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/container.cpp 
b/proton-c/bindings/cpp/src/container.cpp
index f19b72b..34bd2a0 100644
--- a/proton-c/bindings/cpp/src/container.cpp
+++ b/proton-c/bindings/cpp/src/container.cpp
@@ -48,7 +48,9 @@ container::container(messaging_handler &mhandler, const 
std::string& id) :
 
 container::~container() {}
 
-connection container::connect(const url &host, handler *h) { return 
impl_->connect(host, h); }
+connection container::connect(const url &host, const connection_options &opts) 
{
+    return impl_->connect(host, opts);
+}
 
 reactor container::reactor() const { return impl_->reactor_; }
 
@@ -64,10 +66,18 @@ receiver container::open_receiver(const proton::url &url) {
     return impl_->open_receiver(url);
 }
 
-acceptor container::listen(const proton::url &url) {
+acceptor container::listen(const proton::url &url, const connection_options 
&opts) {
+#ifdef PN_COMING_SOON
+    return impl_->listen(url, opts);
+#else
     return impl_->listen(url);
+#endif
 }
 
 task container::schedule(int delay, handler *h) { return 
impl_->schedule(delay, h); }
 
+void container::client_connection_options(const connection_options &o) { 
impl_->client_connection_options(o); }
+
+void container::server_connection_options(const connection_options &o) { 
impl_->server_connection_options(o); }
+
 } // namespace proton

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e61c86a/proton-c/bindings/cpp/src/container_impl.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/container_impl.cpp 
b/proton-c/bindings/cpp/src/container_impl.cpp
index 8a0b56e..bb4f4c5 100644
--- a/proton-c/bindings/cpp/src/container_impl.cpp
+++ b/proton-c/bindings/cpp/src/container_impl.cpp
@@ -19,6 +19,7 @@
  *
  */
 #include "proton/container.hpp"
+#include "proton/connection_options.hpp"
 #include "proton/event.hpp"
 #include "messaging_event.hpp"
 #include "proton/connection.hpp"
@@ -78,8 +79,9 @@ class override_handler : public handler
 {
   public:
     counted_ptr<pn_handler_t> base_handler;
+    container_impl &container_impl_;
 
-    override_handler(pn_handler_t *h) : base_handler(h) {}
+    override_handler(pn_handler_t *h, container_impl &c) : base_handler(h), 
container_impl_(c) {}
 
     virtual void on_unhandled(event &e) {
         proton_event *pne = dynamic_cast<proton_event *>(&e);
@@ -90,9 +92,17 @@ class override_handler : public handler
 
         pn_event_t *cevent = pne->pn_event();
         pn_connection_t *conn = pn_event_connection(cevent);
-        if (conn && type != PN_CONNECTION_INIT) {
+        if (conn) {
             handler *override = connection_context::get(conn).handler.get();
-            if (override) e.dispatch(*override);
+            if (override && type != PN_CONNECTION_INIT) {
+                // Send event to connector
+                e.dispatch(*override);
+            }
+            else if (!override && type == PN_CONNECTION_INIT) {
+                // Newly accepted connection from lister socket
+                connection c(conn);
+                container_impl_.configure_server_connection(c);
+            }
         }
         pn_handler_dispatch(base_handler.get(), cevent, (pn_event_type_t) 
type);
     }
@@ -124,7 +134,7 @@ container_impl::container_impl(container& c, handler *h, 
const std::string& id)
 
     // Set our own global handler that "subclasses" the existing one
     pn_handler_t *global_handler = reactor_.pn_global_handler();
-    override_handler_.reset(new override_handler(global_handler));
+    override_handler_.reset(new override_handler(global_handler, *this));
     counted_ptr<pn_handler_t> 
cpp_global_handler(cpp_handler(override_handler_.get()));
     reactor_.pn_global_handler(cpp_global_handler.get());
     if (handler_) {
@@ -141,10 +151,14 @@ container_impl::container_impl(container& c, handler *h, 
const std::string& id)
 
 container_impl::~container_impl() {}
 
-connection container_impl::connect(const proton::url &url, handler *h) {
+connection container_impl::connect(const proton::url &url, const 
connection_options &user_opts) {
+    connection_options opts = client_connection_options(); // Defaults
+    opts.override(user_opts);
+    handler *h = opts.handler();
+
     counted_ptr<pn_handler_t> chandler = h ? cpp_handler(h) : 
counted_ptr<pn_handler_t>();
     connection conn(reactor_.connection(chandler.get()));
-    pn_unique_ptr<connector> ctor(new connector(conn));
+    pn_unique_ptr<connector> ctor(new connector(conn, opts));
     ctor->address(url);  // TODO: url vector
     connection_context& cc(conn.context());
     cc.container_impl = this;
@@ -154,7 +168,7 @@ connection container_impl::connect(const proton::url &url, 
handler *h) {
 }
 
 sender container_impl::open_sender(const proton::url &url) {
-    connection conn = connect(url, 0);
+    connection conn = connect(url, connection_options());
     std::string path = url.path();
     sender snd = conn.default_session().open_sender(id_ + '-' + path);
     snd.target().address(path);
@@ -163,7 +177,7 @@ sender container_impl::open_sender(const proton::url &url) {
 }
 
 receiver container_impl::open_receiver(const proton::url &url) {
-    connection conn = connect(url, 0);
+    connection conn = connect(url, connection_options());
     std::string path = url.path();
     receiver rcv = conn.default_session().open_receiver(id_ + '-' + path);
     rcv.source().address(path);
@@ -172,13 +186,28 @@ receiver container_impl::open_receiver(const proton::url 
&url) {
 }
 
 acceptor container_impl::listen(const proton::url& url) {
+#ifdef PN_COMING_SOON
+    connection_options opts = server_connection_options(); // Defaults
+    opts.override(user_opts);
+    handler *h = opts.handler();
+    counted_ptr<pn_handler_t> chandler = h ? cpp_handler(h) : 
counted_ptr<pn_handler_t>();
+    pn_acceptor_t *acptr = pn_reactor_acceptor(
+        pn_cast(reactor_.get()), url.host().c_str(), url.port().c_str(), 
chandler.get());
+#else
     acceptor acptr = reactor_.listen(url);
-    if (!!acptr)
-        return acptr;
-    else
+#endif
+    if (!acptr)
         throw error(MSG("accept fail: " <<
                         pn_error_text(pn_io_error(reactor_.pn_io())))
                         << "(" << url << ")");
+#ifdef PN_COMING_SOON
+    // Do not use pn_acceptor_set_ssl_domain().  Manage the incoming 
connections ourselves for
+    // more flexibility (i.e. ability to change the server cert for a long 
running listener).
+    listener_context& lc(listener_context::get(acptr));
+    lc.connection_options = opts;
+    lc.ssl = url.scheme() == url::AMQPS;
+#endif
+    return acptr;
 }
 
 std::string container_impl::next_link_name() {
@@ -195,4 +224,30 @@ task container_impl::schedule(int delay, handler *h) {
     return reactor_.schedule(delay, task_handler.get());
 }
 
+void container_impl::client_connection_options(const connection_options &opts) 
{
+    client_connection_options_ = opts;
+}
+
+void container_impl::server_connection_options(const connection_options &opts) 
{
+    server_connection_options_ = opts;
+}
+
+void container_impl::configure_server_connection(connection &c) {
+#ifdef PN_COMING_SOON
+    pn_acceptor_t *pnp = pn_connection_acceptor(pn_cast(&c));
+    listener_context &lc(listener_context::get(pnp));
+    class connection_options &opts(lc.connection_options);
+    if (opts.sasl_enabled()) {
+        sasl &s(c.transport().sasl());
+        s.allow_insecure_mechs(opts.allow_insecure_mechs());
+        if (opts.allowed_mechs())
+            s.allowed_mechs(*opts.allowed_mechs());
+    }
+    opts.apply(c);
+#else
+    // Can't distinguish between multiple listeners yet.  See PROTON-1054
+    server_connection_options_.apply(c);
+#endif
+}
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e61c86a/proton-c/bindings/cpp/src/container_impl.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/container_impl.hpp 
b/proton-c/bindings/cpp/src/container_impl.hpp
index 2d97780..d9401d0 100644
--- a/proton-c/bindings/cpp/src/container_impl.hpp
+++ b/proton-c/bindings/cpp/src/container_impl.hpp
@@ -47,7 +47,7 @@ class container_impl
   public:
     PN_CPP_EXTERN container_impl(container&, handler *, const std::string& id);
     PN_CPP_EXTERN ~container_impl();
-    PN_CPP_EXTERN connection connect(const url&, handler *h);
+    PN_CPP_EXTERN connection connect(const url&, const connection_options&);
     PN_CPP_EXTERN sender open_sender(connection &connection, const std::string 
&addr, handler *h);
     PN_CPP_EXTERN sender open_sender(const url&);
     PN_CPP_EXTERN receiver open_receiver(connection &connection, const 
std::string &addr, bool dynamic, handler *h);
@@ -55,7 +55,12 @@ class container_impl
     PN_CPP_EXTERN class acceptor listen(const url&);
     PN_CPP_EXTERN duration timeout();
     PN_CPP_EXTERN void timeout(duration timeout);
+    void client_connection_options(const connection_options &);
+    const connection_options& client_connection_options() { return 
client_connection_options_; }
+    void server_connection_options(const connection_options &);
+    const connection_options& server_connection_options() { return 
server_connection_options_; }
 
+    void configure_server_connection(connection &c);
     task schedule(int delay, handler *h);
     counted_ptr<pn_handler_t> cpp_handler(handler *h);
 
@@ -71,6 +76,8 @@ class container_impl
     pn_unique_ptr<handler> flow_controller_;
     std::string id_;
     uint64_t link_id_;
+    connection_options client_connection_options_;
+    connection_options server_connection_options_;
 
   friend class container;
 };

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e61c86a/proton-c/bindings/cpp/src/reconnect_timer.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/reconnect_timer.cpp 
b/proton-c/bindings/cpp/src/reconnect_timer.cpp
new file mode 100644
index 0000000..2a44063
--- /dev/null
+++ b/proton-c/bindings/cpp/src/reconnect_timer.cpp
@@ -0,0 +1,69 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "proton/reconnect_timer.hpp"
+#include "proton/reactor.hpp"
+#include "proton/error.hpp"
+#include "msg.hpp"
+#include "proton/types.h"
+#include "proton/reactor.h"
+
+namespace proton {
+
+reconnect_timer::reconnect_timer(uint32_t first, int32_t max, uint32_t 
increment,
+                                 bool doubling, int32_t max_retries, int32_t 
timeout) :
+    first_delay_(first), max_delay_(max), increment_(increment), 
doubling_(doubling),
+    max_retries_(max_retries), timeout_(timeout), retries_(0), 
next_delay_(-1), timeout_deadline_(0),
+    reactor_(0) {}
+
+void reconnect_timer::reset() {
+    retries_ = 0;
+    next_delay_ = 0;
+    timeout_deadline_ = 0;
+}
+
+int reconnect_timer::next_delay() {
+    retries_++;
+    if (max_retries_ >= 0 && retries_ > max_retries_)
+        return -1;
+    if (!reactor_)
+        throw error(MSG("reconnect timer missing reactor reference"));
+    pn_timestamp_t now = reactor_.now();
+
+    if (retries_ == 1) {
+        if (timeout_ >= 0)
+            timeout_deadline_ = now + timeout_;
+        next_delay_ = first_delay_;
+    } else if (retries_ == 2) {
+        next_delay_ += increment_;
+    } else {
+        next_delay_ += doubling_ ? next_delay_ : increment_;
+    }
+    if (timeout_deadline_ && now >= timeout_deadline_)
+        return -1;
+    if (max_delay_ >= 0 && next_delay_ > max_delay_)
+        next_delay_ = max_delay_;
+    if (timeout_deadline_ && (now + next_delay_ > timeout_deadline_))
+        next_delay_ = timeout_deadline_ - now;
+    return next_delay_;
+}
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8e61c86a/proton-c/bindings/cpp/src/transport.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/transport.cpp 
b/proton-c/bindings/cpp/src/transport.cpp
index 6e6e612..0574663 100644
--- a/proton-c/bindings/cpp/src/transport.cpp
+++ b/proton-c/bindings/cpp/src/transport.cpp
@@ -20,6 +20,7 @@
  */
 #include "proton/transport.hpp"
 #include "proton/connection.hpp"
+#include "msg.hpp"
 #include "proton/transport.h"
 
 namespace proton {
@@ -28,4 +29,39 @@ connection transport::connection() const {
     return pn_transport_connection(pn_object());
 }
 
+void transport::unbind() {
+    if (pn_transport_unbind(pn_object()))
+        throw error(MSG("transport::unbind failed " << 
pn_error_text(pn_transport_error(pn_object()))));
+}
+
+void transport::bind(class connection &conn) {
+//    pn_connection_t *c = static_cast<pn_connection_t*>(conn.object_);
+    if (pn_transport_bind(pn_object(), conn.pn_object()))
+        throw error(MSG("transport::bind failed " << 
pn_error_text(pn_transport_error(pn_object()))));
+}
+
+uint32_t transport::max_frame_size() const {
+    return pn_transport_get_max_frame(pn_object());
+}
+
+uint32_t transport::remote_max_frame_size() const {
+    return pn_transport_get_remote_max_frame(pn_object());
+}
+
+uint16_t transport::max_channels() const {
+    return pn_transport_get_channel_max(pn_object());
+}
+
+uint16_t transport::remote_max_channels() const {
+    return pn_transport_remote_channel_max(pn_object());
+}
+
+uint32_t transport::idle_timeout() const {
+    return pn_transport_get_idle_timeout(pn_object());
+}
+
+uint32_t transport::remote_idle_timeout() const {
+    return pn_transport_get_remote_idle_timeout(pn_object());
+}
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to