Repository: qpid-proton Updated Branches: refs/heads/master ded03b190 -> f53c7683d
PROTON-1959: [cpp] API additions to simplify reconnect messaging_handler::on_connection_start() - initial, exactly once event messaging_handler::on_connection_reconnecting() - disconnected with auto-reconnect pending bool connection::reconnected() - connection has been auto-reconnected Added connection life-cycle doc to the messaging_handler class API doc. Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/f53c7683 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/f53c7683 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/f53c7683 Branch: refs/heads/master Commit: f53c7683d7e903634b53db30d1f401006b43015a Parents: 77ddf4b Author: Alan Conway <[email protected]> Authored: Tue Oct 30 15:14:04 2018 -0400 Committer: Alan Conway <[email protected]> Committed: Tue Oct 30 16:37:01 2018 -0400 ---------------------------------------------------------------------- cpp/include/proton/connection.hpp | 13 +++- cpp/include/proton/messaging_handler.hpp | 103 ++++++++++++++---------- cpp/include/proton/reconnect_options.hpp | 4 +- cpp/src/connection.cpp | 6 ++ cpp/src/contexts.cpp | 2 +- cpp/src/contexts.hpp | 1 + cpp/src/handler.cpp | 6 +- cpp/src/messaging_adapter.cpp | 9 ++- cpp/src/proactor_container_impl.cpp | 74 +++++++++++------- cpp/src/proactor_container_impl.hpp | 4 +- cpp/src/reconnect_test.cpp | 108 ++++++++++++++++++++++---- 11 files changed, 238 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f53c7683/cpp/include/proton/connection.hpp ---------------------------------------------------------------------- diff --git a/cpp/include/proton/connection.hpp b/cpp/include/proton/connection.hpp index de9c904..ee90a04 100644 --- a/cpp/include/proton/connection.hpp +++ b/cpp/include/proton/connection.hpp @@ -82,14 +82,17 @@ PN_CPP_CLASS_EXTERN connection : public internal::object<pn_connection_t>, publi PN_CPP_EXTERN std::string user() const; /// Open the connection. - /// - /// @see endpoint_lifecycle + /// @see messaging_handler PN_CPP_EXTERN void open(); /// @copydoc open PN_CPP_EXTERN void open(const connection_options&); + /// Close the connection. + /// @see messaging_handler PN_CPP_EXTERN void close(); + + /// @copydoc close PN_CPP_EXTERN void close(const error_condition&); /// Open a new session. @@ -172,6 +175,12 @@ PN_CPP_CLASS_EXTERN connection : public internal::object<pn_connection_t>, publi /// execute code safely in the event-handler thread. PN_CPP_EXTERN void wake() const; + /// **Unsettled API** - true if this connection has been automatically + /// re-connected. + /// + /// @see reconnect_options, messaging_handler + PN_CPP_EXTERN bool reconnected() const; + /// @cond INTERNAL friend class internal::factory<connection>; friend class container; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f53c7683/cpp/include/proton/messaging_handler.hpp ---------------------------------------------------------------------- diff --git a/cpp/include/proton/messaging_handler.hpp b/cpp/include/proton/messaging_handler.hpp index 6eb3755..fbefe8e 100644 --- a/cpp/include/proton/messaging_handler.hpp +++ b/cpp/include/proton/messaging_handler.hpp @@ -30,59 +30,65 @@ namespace proton { -/// A handler for Proton messaging events. + +/// Handler for Proton messaging events. /// /// Subclass and override the event-handling member functions. /// -/// Event handling functions can always use the objects passed as -/// arguments. +/// **Thread-safety**: A thread-safe handler can use the objects +/// passed as arguments, and other objects belonging to the same +/// proton::connection. It *must not* use objects belonging to a +/// different connection. See @ref mt_page and proton::work_queue for +/// safe ways to communicate between connections. Thread-safe +/// handlers can also be used in single-threaded code. +/// +/// **Single-threaded only**: An application is single-threaded if it +/// calls container::run() exactly once, and only makes make proton +/// calls from handler functions. Single-threaded handler functions +/// can use objects belonging to another connection, but *must* call +/// connection::wake() on the other connection before returning. Such +/// a handler is not thread-safe. /// -/// @note A handler function **must not** use proton objects that are -/// not accessible via the arguments passed without taking extra -/// care. For example an on_message() handler called for connection -/// "A" cannot simply call sender::send() on a proton::sender -/// belonging to connection "B". +/// ### Connection life-cycle and automatic re-connect /// -/// **Thread-safety**: To be safe for both single- and multi-threaded -/// use, a handler **must not** directly use objects belonging to -/// another connection. See @ref mt_page and proton::work_queue for -/// safe ways to communicate. We recommend writing safe handlers to -/// avoid mysterious failures if the handler is ever used in a -/// multi-threaded container. +/// on_connection_start() is the first event for any connection. /// -/// **Single-threaded only**: An application is single-threaded if it -/// calls container::run() exactly once, and does not make proton -/// calls from any other thread. In this case a handler can use -/// objects belonging to another connection, but it must call -/// connection::wake() on the other connection before returning. Such -/// a handler will fail mysteriously if the container is run with -/// multiple threads. +/// on_connection_open() means the remote peer has sent an AMQP open. +/// For a client, this means the connection is fully open. A server +/// should respond with connection::open() or reject the request with +/// connection::close() /// -/// #### Close and error handling +/// on_connection_reconnecting() may be called if automatic re-connect +/// is enabled (see reconnect_options). It is called when the +/// connection is disconnected and a re-connect will be +/// attempted. Calling connection::close() will cancel the re-connect. /// -/// There are several objects that have `on_X_close` and `on_X_error` -/// functions. They are called as follows: +/// on_connection_open() will be called again on a successful +/// re-connect. Each open @ref session, @ref sender and @ref receiver +/// will also be automatically re-opened. On success, on_sender_open() +/// or on_receiver_open() are called, on failure on_sender_error() or +/// on_receiver_error(). /// -/// - If `X` is closed cleanly, with no error status, then `on_X_close` -/// is called. +/// on_connection_close() indicates orderly shut-down of the +/// connection. Servers should respond with connection::close(). +/// on_connection_close() is not called if the connection fails before +/// the remote end can do an orderly close. /// -/// - If `X` is closed with an error, then `on_X_error` is called, -/// followed by `on_X_close`. The error condition is also available -/// in `on_X_close` from `X::error()`. +/// on_transport_close() is always the final event for a connection, and +/// is always called regardless of how the connection closed or failed. /// -/// By default, if you do not implement `on_X_error`, it will call -/// `on_error`. If you do not implement `on_error` it will throw a -/// `proton::error` exception, which may not be what you want but -/// does help to identify forgotten error handling quickly. +/// If the connection or transport closes with an error, on_connection_error() +/// or on_transport_error() is called immediately before on_connection_close() or +/// on_transport_close(). You can also check for error conditions in the close +/// function with connection::error() or transport::error() /// -/// #### Resource cleanup +/// Note: closing a connection with the special error condition +/// `amqp:connection-forced`is treated as a disconnect - it triggers +/// automatic re-connect or on_transport_error()/on_transport_close(), +/// not on_connection_close(). +/// +/// @see reconnect_options /// -/// Every `on_X_open` event is paired with an `on_X_close` event which -/// can clean up any resources created by the open handler. In -/// particular this is still true if an error is reported with an -/// `on_X_error` event. The error-handling logic doesn't have to -/// manage resource clean up. It can assume that the close event will -/// be along to handle it. class PN_CPP_CLASS_EXTERN messaging_handler { public: @@ -111,15 +117,28 @@ PN_CPP_CLASS_EXTERN messaging_handler { PN_CPP_EXTERN virtual void on_transport_open(transport&); /// The underlying network transport has closed. + /// This is the final event for a connection, there will be + /// no more events or re-connect attempts. PN_CPP_EXTERN virtual void on_transport_close(transport&); - /// The underlying network transport has closed with an error - /// condition. + /// The underlying network transport has disconnected unexpectedly. PN_CPP_EXTERN virtual void on_transport_error(transport&); + /// **Unsettled API** - Called before the connection is opened. + /// Use for initial setup, e.g. to open senders or receivers. + PN_CPP_EXTERN virtual void on_connection_start(connection&); + /// The remote peer opened the connection. + /// Called for the initial open, and also after each successful re-connect if + /// @ref reconnect_options are set. PN_CPP_EXTERN virtual void on_connection_open(connection&); + /// **Unsettled API** - The connection has been disconnected and + /// is about to attempt an automatic re-connect. + /// If on_connection_reconnecting() calls connection::close() then + /// the reconnect attempt will be canceled. + PN_CPP_EXTERN virtual void on_connection_reconnecting(connection&); + /// The remote peer closed the connection. PN_CPP_EXTERN virtual void on_connection_close(connection&); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f53c7683/cpp/include/proton/reconnect_options.hpp ---------------------------------------------------------------------- diff --git a/cpp/include/proton/reconnect_options.hpp b/cpp/include/proton/reconnect_options.hpp index bc4e43a..2206eec 100644 --- a/cpp/include/proton/reconnect_options.hpp +++ b/cpp/include/proton/reconnect_options.hpp @@ -42,10 +42,10 @@ namespace proton { /// reconnection attempts. They may be open-ended or limited in time. /// They may be evenly spaced or increasing at an exponential rate. /// -/// Options can be "chained". See @ref proton::connection_options. -/// /// Normal value semantics: copy or assign creates a separate copy of /// the options. +/// +/// @see messaging_handler, connection_options::reconnect() class reconnect_options { public: /// Create an empty set of options. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f53c7683/cpp/src/connection.cpp ---------------------------------------------------------------------- diff --git a/cpp/src/connection.cpp b/cpp/src/connection.cpp index 223bb0c..f0219cd 100644 --- a/cpp/src/connection.cpp +++ b/cpp/src/connection.cpp @@ -192,4 +192,10 @@ std::vector<symbol> connection::desired_capabilities() const { return get_multiple<std::vector<symbol> >(caps); } +bool connection::reconnected() const { + connection_context& cc = connection_context::get(pn_object()); + reconnect_context* rc = cc.reconnect_context_.get(); + return (rc && rc->reconnected_); } + +} // namespace proton http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f53c7683/cpp/src/contexts.cpp ---------------------------------------------------------------------- diff --git a/cpp/src/contexts.cpp b/cpp/src/contexts.cpp index bd5e80a..9689de2 100644 --- a/cpp/src/contexts.cpp +++ b/cpp/src/contexts.cpp @@ -72,7 +72,7 @@ connection_context::connection_context() : {} reconnect_context::reconnect_context(const reconnect_options& ro) : - reconnect_options_(new reconnect_options(ro)), retries_(0), current_url_(-1) + reconnect_options_(new reconnect_options(ro)), retries_(0), current_url_(-1), reconnected_(false) {} listener_context::listener_context() : listen_handler_(0) {} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f53c7683/cpp/src/contexts.hpp ---------------------------------------------------------------------- diff --git a/cpp/src/contexts.hpp b/cpp/src/contexts.hpp index 4c5d679..4a046a4 100644 --- a/cpp/src/contexts.hpp +++ b/cpp/src/contexts.hpp @@ -107,6 +107,7 @@ class reconnect_context { duration delay_; int retries_; int current_url_; + bool reconnected_; }; class listener_context : public context { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f53c7683/cpp/src/handler.cpp ---------------------------------------------------------------------- diff --git a/cpp/src/handler.cpp b/cpp/src/handler.cpp index 0e693a0..16fc38e 100644 --- a/cpp/src/handler.cpp +++ b/cpp/src/handler.cpp @@ -48,6 +48,7 @@ void messaging_handler::on_sendable(sender &) {} void messaging_handler::on_transport_close(transport &) {} void messaging_handler::on_transport_error(transport &t) { on_error(t.error()); } void messaging_handler::on_transport_open(transport &) {} + void messaging_handler::on_connection_close(connection &) {} void messaging_handler::on_connection_error(connection &c) { on_error(c.error()); } void messaging_handler::on_connection_open(connection &c) { @@ -55,6 +56,10 @@ void messaging_handler::on_connection_open(connection &c) { pn_connection_open(unwrap(c)); } } +void messaging_handler::on_connection_reconnecting(connection &) {} +void messaging_handler::on_connection_start(connection &) {} +void messaging_handler::on_connection_wake(connection&) {} + void messaging_handler::on_session_close(session &) {} void messaging_handler::on_session_error(session &s) { on_error(s.error()); } void messaging_handler::on_session_open(session &s) { @@ -85,7 +90,6 @@ void messaging_handler::on_tracker_settle(tracker &) {} void messaging_handler::on_delivery_settle(delivery &) {} void messaging_handler::on_sender_drain_start(sender &) {} void messaging_handler::on_receiver_drain_finish(receiver &) {} -void messaging_handler::on_connection_wake(connection&) {} void messaging_handler::on_error(const error_condition& c) { throw proton::error(c.what()); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f53c7683/cpp/src/messaging_adapter.cpp ---------------------------------------------------------------------- diff --git a/cpp/src/messaging_adapter.cpp b/cpp/src/messaging_adapter.cpp index e01f29f..91fbc6a 100644 --- a/cpp/src/messaging_adapter.cpp +++ b/cpp/src/messaging_adapter.cpp @@ -229,6 +229,13 @@ void on_connection_remote_close(messaging_handler& handler, pn_event_t* event) { pn_connection_close(conn); } +void on_connection_bound(messaging_handler& handler, pn_event_t* event) { + connection c(make_wrapper(pn_event_connection(event))); + if (!c.reconnected()) { // Call on_connection_start() on first connect + handler.on_connection_start(c); + } +} + void on_connection_remote_open(messaging_handler& handler, pn_event_t* event) { // Generate on_transport_open event here until we find a better place transport t(make_wrapper(pn_event_transport(event))); @@ -303,7 +310,7 @@ void messaging_adapter::dispatch(messaging_handler& handler, pn_event_t* event) // Only handle events we are interested in switch(type) { - + case PN_CONNECTION_BOUND: on_connection_bound(handler, event); break; case PN_CONNECTION_REMOTE_OPEN: on_connection_remote_open(handler, event); break; case PN_CONNECTION_REMOTE_CLOSE: on_connection_remote_close(handler, event); break; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f53c7683/cpp/src/proactor_container_impl.cpp ---------------------------------------------------------------------- diff --git a/cpp/src/proactor_container_impl.cpp b/cpp/src/proactor_container_impl.cpp index e0057cd..fe5e087 100644 --- a/cpp/src/proactor_container_impl.cpp +++ b/cpp/src/proactor_container_impl.cpp @@ -22,10 +22,12 @@ #include "proton/connect_config.hpp" #include "proton/error_condition.hpp" -#include "proton/listener.hpp" #include "proton/listen_handler.hpp" +#include "proton/listener.hpp" #include "proton/reconnect_options.hpp" +#include "proton/transport.hpp" #include "proton/url.hpp" + #include "proton/connection.h" #include "proton/listener.h" #include "proton/proactor.h" @@ -294,7 +296,11 @@ void container::impl::reset_reconnect(pn_connection_t* pnc) { rc->current_url_ = -1; } -bool container::impl::setup_reconnect(pn_connection_t* pnc) { +bool container::impl::can_reconnect(pn_connection_t* pnc) { + // Don't reconnect if we are locally closed, the application will + // not expect a connection it closed to re-open. + if (pn_connection_state(pnc) & PN_LOCAL_CLOSED) return false; + // If container stopping don't try to reconnect // - we pretend to have set up a reconnect attempt so // that the proactor disconnect will finish and we will exit @@ -322,6 +328,15 @@ bool container::impl::setup_reconnect(pn_connection_t* pnc) { pn_condition_format(condition, "proton:io", "Too many reconnect attempts (%d)", rc->retries_); return false; } + return true; +} + +void container::impl::setup_reconnect(pn_connection_t* pnc) { + connection_context& cc = connection_context::get(pnc); + reconnect_context* rc = cc.reconnect_context_.get(); + if (!rc) return; + + rc->reconnected_ = true; // Recover connection from proactor pn_proactor_release_connection(pnc); @@ -333,8 +348,6 @@ bool container::impl::setup_reconnect(pn_connection_t* pnc) { // now anyway schedule(delay, make_work(&container::impl::reconnect, this, pnc)); ++reconnecting_; - - return true; } returned<connection> container::impl::connect( @@ -622,10 +635,6 @@ container::impl::dispatch_result container::impl::dispatch(pn_event_t* event) { case PN_CONNECTION_INIT: return ContinueLoop; - // We've already applied options, so don't need to do it here - case PN_CONNECTION_BOUND: - return ContinueLoop; - case PN_CONNECTION_REMOTE_OPEN: { // This is the only event that we get indicating that the connection succeeded so // it's the only place to reset the reconnection logic. @@ -639,15 +648,17 @@ container::impl::dispatch_result container::impl::dispatch(pn_event_t* event) { pn_connection_t *c = pn_event_connection(event); pn_condition_t *cc = pn_connection_remote_condition(c); - // If we got a close with a condition of amqp:connection:forced then don't send - // any close/error events now. Just set the error condition on the transport and - // close the connection - This should act as though a transport error occurred - if (pn_condition_is_set(cc) - && !strcmp(pn_condition_get_name(cc), "amqp:connection:forced")) { + // amqp:connection:forced should be treated like a transport + // disconnect. Hide the connection error/close events from the + // application and generate a PN_TRANSPORT_CLOSE event. + if (pn_condition_is_set(cc) && + !strcmp(pn_condition_get_name(cc), "amqp:connection:forced")) + { pn_transport_t* t = pn_event_transport(event); pn_condition_t* tc = pn_transport_condition(t); pn_condition_copy(tc, cc); - pn_connection_close(c); + pn_transport_close_head(t); + pn_transport_close_tail(t); return ContinueLoop; } break; @@ -656,19 +667,35 @@ container::impl::dispatch_result container::impl::dispatch(pn_event_t* event) { // If reconnect is turned on then handle closed on error here with reconnect attempt pn_connection_t* c = pn_event_connection(event); pn_transport_t* t = pn_event_transport(event); - // If we successfully schedule a re-connect then hide the event from - // user handlers by returning here. - if (pn_condition_is_set(pn_transport_condition(t)) && setup_reconnect(c)) return ContinueLoop; + if (pn_condition_is_set(pn_transport_condition(t)) && can_reconnect(c)) { + messaging_handler *mh = get_handler(event); + if (mh) { // Notify handler of pending reconnect + connection conn = make_wrapper(c); + mh->on_connection_reconnecting(conn); + } + // on_connection_reconnecting() may have closed the connection, check again. + if (!(pn_connection_state(c) & PN_LOCAL_CLOSED)) { + setup_reconnect(c); + return ContinueLoop; + } + } // Otherwise, this connection will be freed by the proactor. // Mark its work_queue finished so it won't try to use the freed connection. connection_context::get(c).work_queue_.impl_.get()->finished(); + break; } default: break; } - // Figure out the handler for the primary object for event - messaging_handler* mh = 0; + messaging_handler *mh = get_handler(event); + if (mh) messaging_adapter::dispatch(*mh, event); + return ContinueLoop; +} + +// Figure out the handler for the primary object for event +messaging_handler* container::impl::get_handler(pn_event_t *event) { + messaging_handler *mh = 0; // First try for a link (send/receiver) handler pn_link_t *link = pn_event_link(event); @@ -683,14 +710,7 @@ container::impl::dispatch_result container::impl::dispatch(pn_event_t* event) { if (connection && !mh) mh = get_handler(connection); // Use container handler if nothing more specific (must be a container handler) - if (!mh) mh = handler_; - - // If we still have no handler don't do anything! - // This is pretty unusual, but possible if we use the default constructor for container - if (!mh) return ContinueLoop; - - messaging_adapter::dispatch(*mh, event); - return ContinueLoop; + return mh ? mh : handler_; } void container::impl::thread() { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f53c7683/cpp/src/proactor_container_impl.hpp ---------------------------------------------------------------------- diff --git a/cpp/src/proactor_container_impl.hpp b/cpp/src/proactor_container_impl.hpp index 912a036..0f0ef60 100644 --- a/cpp/src/proactor_container_impl.hpp +++ b/cpp/src/proactor_container_impl.hpp @@ -94,6 +94,7 @@ class container::impl { void schedule(duration, work); template <class T> static void set_handler(T s, messaging_handler* h); template <class T> static messaging_handler* get_handler(T s); + messaging_handler* get_handler(pn_event_t *event); static work_queue::impl* make_work_queue(container&); private: @@ -106,7 +107,8 @@ class container::impl { void start_connection(const url& url, pn_connection_t* c); void reconnect(pn_connection_t* pnc); duration next_delay(reconnect_context& rc); - bool setup_reconnect(pn_connection_t* pnc); + bool can_reconnect(pn_connection_t* pnc); + void setup_reconnect(pn_connection_t* pnc); void reset_reconnect(pn_connection_t* pnc); // Event loop to run in each container thread http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f53c7683/cpp/src/reconnect_test.cpp ---------------------------------------------------------------------- diff --git a/cpp/src/reconnect_test.cpp b/cpp/src/reconnect_test.cpp index f29cd53..5a56cf1 100644 --- a/cpp/src/reconnect_test.cpp +++ b/cpp/src/reconnect_test.cpp @@ -17,18 +17,19 @@ * under the License. */ - #include "test_bits.hpp" -#include "proton/error_condition.hpp" #include "proton/connection.hpp" #include "proton/connection_options.hpp" #include "proton/container.hpp" #include "proton/delivery.hpp" +#include "proton/error_condition.hpp" +#include "proton/listen_handler.hpp" +#include "proton/listener.hpp" #include "proton/message.hpp" #include "proton/messaging_handler.hpp" -#include "proton/listener.hpp" -#include "proton/listen_handler.hpp" #include "proton/reconnect_options.hpp" +#include "proton/receiver_options.hpp" +#include "proton/transport.hpp" #include "proton/work_queue.hpp" #include "proton/internal/pn_unique_ptr.hpp" @@ -77,7 +78,6 @@ class server_connection_handler : public proton::messaging_handler { int messages_; int expect_; bool closing_; - bool done_; listen_handler listen_handler_; void close (proton::connection &c) { @@ -89,7 +89,7 @@ class server_connection_handler : public proton::messaging_handler { public: server_connection_handler(proton::container& c, int e, waiter& w) - : messages_(0), expect_(e), closing_(false), done_(false), listen_handler_(*this, w) + : messages_(0), expect_(e), closing_(false), listen_handler_(*this, w) { listener_ = c.listen("//:0", listen_handler_); } @@ -106,14 +106,19 @@ class server_connection_handler : public proton::messaging_handler { else c.open(); } + void on_receiver_open(proton::receiver &r) PN_CPP_OVERRIDE { + // Reduce message noise in PN_TRACE output for debugging. + // Only the first message is relevant + // Control accepts, accepting the message tells the client to finally close. + r.open(proton::receiver_options().credit_window(0).auto_accept(false)); + r.add_credit(1); + } + void on_message(proton::delivery & d, proton::message & m) PN_CPP_OVERRIDE { ++messages_; proton::connection c = d.connection(); if (messages_==expect_) close(c); - } - - void on_connection_close(proton::connection & c) PN_CPP_OVERRIDE { - done_ = true; + else d.accept(); } void on_transport_error(proton::transport & ) PN_CPP_OVERRIDE { @@ -125,7 +130,9 @@ class server_connection_handler : public proton::messaging_handler { class tester : public proton::messaging_handler, public waiter { public: - tester() : waiter(3), container_(*this, "reconnect_server") {} + tester() : waiter(3), container_(*this, "reconnect_client"), + start_count(0), open_count(0), reconnecting_count(0), + link_open_count(0), transport_error_count(0), transport_close_count(0) {} void on_container_start(proton::container &c) PN_CPP_OVERRIDE { // Server that fails upon connection @@ -144,22 +151,53 @@ class tester : public proton::messaging_handler, public waiter { container_.connect(s1->url(), proton::connection_options().reconnect(proton::reconnect_options().failover_urls(urls))); } - void on_connection_open(proton::connection& c) PN_CPP_OVERRIDE { + void on_connection_start(proton::connection& c) PN_CPP_OVERRIDE { + start_count++; c.open_sender("messages"); + ASSERT(!c.reconnected()); + } + + void on_connection_open(proton::connection& c) PN_CPP_OVERRIDE { + ASSERT(bool(open_count) == c.reconnected()); + open_count++; + } + + void on_connection_reconnecting(proton::connection& c) PN_CPP_OVERRIDE { + reconnecting_count++; + } + + void on_sender_open(proton::sender &s) PN_CPP_OVERRIDE { + ASSERT(bool(link_open_count) == s.connection().reconnected()); + link_open_count++; } void on_sendable(proton::sender& s) PN_CPP_OVERRIDE { - proton::message m; - m.body("hello"); - s.send(m); + s.send(proton::message("hello")); } void on_tracker_accept(proton::tracker& d) PN_CPP_OVERRIDE { d.connection().close(); } + void on_transport_error(proton::transport& t) PN_CPP_OVERRIDE { + transport_error_count++; + } + + void on_transport_close(proton::transport& t) PN_CPP_OVERRIDE { + transport_close_count++; + } + void run() { container_.run(); + ASSERT_EQUAL(1, start_count); + ASSERT_EQUAL(3, open_count); + ASSERT(2 < reconnecting_count); + // Last reconnect fails before opening links + ASSERT(link_open_count > 1); + // All transport errors should have been hidden + ASSERT_EQUAL(0, transport_error_count); + // One final transport close, not an error + ASSERT_EQUAL(1, transport_close_count); } private: @@ -167,6 +205,7 @@ class tester : public proton::messaging_handler, public waiter { proton::internal::pn_unique_ptr<server_connection_handler> s2; proton::internal::pn_unique_ptr<server_connection_handler> s3; proton::container container_; + int start_count, open_count, reconnecting_count, link_open_count, transport_error_count, transport_close_count; }; int test_failover_simple() { @@ -245,6 +284,44 @@ class authfail_reconnect_tester : public proton::messaging_handler, public waite bool errored_; }; +// Verify we can stop reconnecting by calling close() in on_connection_reconnecting() +class test_reconnecting_close : public proton::messaging_handler, public waiter { + public: + test_reconnecting_close() : waiter(1), container_(*this, "test_reconnecting_close"), + reconnecting_called(false) {} + + void on_container_start(proton::container &c) PN_CPP_OVERRIDE { + s1.reset(new server_connection_handler(c, 0, *this)); + } + + void ready() PN_CPP_OVERRIDE { + container_.connect(s1->url(), proton::connection_options().reconnect(proton::reconnect_options())); + } + + void on_connection_reconnecting(proton::connection& c) PN_CPP_OVERRIDE { + reconnecting_called = true; + c.close(); // Abort reconnection + } + + void on_connection_close(proton::connection& c) PN_CPP_OVERRIDE { + ASSERT(0); // Not expecting any clean close + } + + void on_transport_error(proton::transport& t) PN_CPP_OVERRIDE { + // Expected, don't throw + } + + void run() { + container_.run(); + } + + private: + proton::container container_; + std::string err_; + bool reconnecting_called; + proton::internal::pn_unique_ptr<server_connection_handler> s1; +}; + int test_auth_fail_reconnect() { authfail_reconnect_tester().run(); return 0; @@ -255,6 +332,7 @@ int main(int argc, char** argv) { RUN_ARGV_TEST(failed, test_failover_simple()); RUN_ARGV_TEST(failed, test_stop_reconnect()); RUN_ARGV_TEST(failed, test_auth_fail_reconnect()); + RUN_ARGV_TEST(failed, test_reconnecting_close().run()); return failed; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
