Repository: activemq-cpp Updated Branches: refs/heads/master e3acfcdb5 -> 59e62e175
https://issues.apache.org/jira/browse/AMQCPP-609 Watch for shutdown and don't reconnect if the remote drops the connection before the close on the transport is registered. (cherry picked from commit f93fa9ea187693c26614cee17e7a7855cea3e7c0) Project: http://git-wip-us.apache.org/repos/asf/activemq-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-cpp/commit/59e62e17 Tree: http://git-wip-us.apache.org/repos/asf/activemq-cpp/tree/59e62e17 Diff: http://git-wip-us.apache.org/repos/asf/activemq-cpp/diff/59e62e17 Branch: refs/heads/master Commit: 59e62e175184b262bda11b68f32efbb4e2c3bf36 Parents: e3acfcd Author: Timothy Bish <[email protected]> Authored: Wed Feb 8 17:35:45 2017 -0500 Committer: Timothy Bish <[email protected]> Committed: Wed Feb 8 17:36:11 2017 -0500 ---------------------------------------------------------------------- .../transport/failover/FailoverTransport.cpp | 34 ++++++++++++++++---- 1 file changed, 28 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/59e62e17/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp ---------------------------------------------------------------------- diff --git a/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp b/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp index 3e1059a..b5ea33b 100644 --- a/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp +++ b/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp @@ -94,6 +94,7 @@ namespace failover { bool rebalanceUpdateURIs; bool priorityBackup; bool backupsEnabled; + volatile bool shutdown; bool doRebalance; bool connectedToPrioirty; @@ -143,6 +144,7 @@ namespace failover { rebalanceUpdateURIs(true), priorityBackup(false), backupsEnabled(false), + shutdown(false), doRebalance(false), connectedToPrioirty(false), reconnectMutex(), @@ -226,7 +228,7 @@ namespace failover { try { ioException = this->connectionFailure.dynamicCast<IOException>(); } - AMQ_CATCH_NOTHROW( ClassCastException) + AMQ_CATCH_NOTHROW(ClassCastException) if (ioException != NULL) { transportListener->onException(*this->connectionFailure); @@ -272,6 +274,9 @@ namespace failover { } } + bool willReconnect() { + return firstConnection || 0 != calculateReconnectAttemptLimit(); + } }; const int FailoverTransportImpl::DEFAULT_INITIAL_RECONNECT_DELAY = 10; @@ -435,7 +440,9 @@ void FailoverTransport::oneway(const Pointer<Command> command) { long long start = System::currentTimeMillis(); bool timedout = false; - while (transport == NULL && !this->impl->closed && this->impl->connectionFailure == NULL) { + while (transport == NULL && !this->impl->closed && + this->impl->connectionFailure == NULL && this->impl->willReconnect()) { + long long end = System::currentTimeMillis(); if (command->isMessage() && this->impl->timeout > 0 && (end - start > this->impl->timeout)) { timedout = true; @@ -455,6 +462,9 @@ void FailoverTransport::oneway(const Pointer<Command> command) { } else if (timedout == true) { error.reset(new IOException(__FILE__, __LINE__, "Failover timeout of %d ms reached.", this->impl->timeout)); + } else if (!this->impl->willReconnect()) { + error.reset(new IOException(__FILE__, __LINE__, + "Maximum reconnection attempts exceeded")); } else { error.reset(new IOException(__FILE__, __LINE__, "Unexpected failure.")); } @@ -468,7 +478,7 @@ void FailoverTransport::oneway(const Pointer<Command> command) { Pointer<Tracked> tracked; try { tracked = stateTracker.track(command); - synchronized( &this->impl->requestMap ) { + synchronized(&this->impl->requestMap) { if (tracked != NULL && tracked->isWaitingForResponse()) { this->impl->requestMap.put(command->getCommandId(), tracked); } else if (tracked == NULL && command->isResponseRequired()) { @@ -485,13 +495,15 @@ void FailoverTransport::oneway(const Pointer<Command> command) { try { transport->oneway(command); stateTracker.trackBack(command); + if (command->isShutdownInfo()) { + this->impl->shutdown = true; + } } catch (IOException& e) { e.setMark(__FILE__, __LINE__); - // If the command was not tracked.. we will retry in - // this method - if (tracked == NULL) { + // If the command was not tracked.. we will retry in this method + if (tracked == NULL && this->impl->canReconnect()) { // since we will retry in this method.. take it out of the // request map so that it is not sent 2 times on recovery @@ -690,8 +702,18 @@ void FailoverTransport::restoreTransport(const Pointer<Transport> transport) { //////////////////////////////////////////////////////////////////////////////// void FailoverTransport::handleTransportFailure(const decaf::lang::Exception& error) { + if (this->impl->shutdown) { + // shutdown info sent and remote socket closed and we see that before a local close + // let the close do the work + return; + } + synchronized(&this->impl->reconnectMutex) { + if (this->impl->shutdown) { + return; + } + Pointer<Transport> transport; this->impl->connectedTransport.swap(transport);
