https://issues.apache.org/jira/browse/AMQCPP-601
Fixes for priority backup handling to prevent stall when reconnecting tries to use a priority instance. Project: http://git-wip-us.apache.org/repos/asf/activemq-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-cpp/commit/a8ff8b54 Tree: http://git-wip-us.apache.org/repos/asf/activemq-cpp/tree/a8ff8b54 Diff: http://git-wip-us.apache.org/repos/asf/activemq-cpp/diff/a8ff8b54 Branch: refs/heads/3.9.x Commit: a8ff8b54e0d75fb751f5b76682c715a78ee8150d Parents: 2c99860 Author: Timothy Bish <[email protected]> Authored: Tue Feb 14 16:56:48 2017 -0500 Committer: Timothy Bish <[email protected]> Committed: Tue Feb 14 16:56:48 2017 -0500 ---------------------------------------------------------------------- .../transport/failover/BackupTransportPool.cpp | 53 +- .../transport/failover/FailoverTransport.cpp | 32 +- .../activemq/transport/failover/URIPool.cpp | 4 +- .../test/activemq/mock/MockBrokerService.cpp | 28 +- .../src/test/activemq/mock/MockBrokerService.h | 2 + .../failover/FailoverTransportTest.cpp | 241 +++++ .../transport/failover/FailoverTransportTest.h | 6 + activemq-cpp/src/test/testRegistry.cpp | 996 +++++++++---------- 8 files changed, 825 insertions(+), 537 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/a8ff8b54/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.cpp ---------------------------------------------------------------------- diff --git a/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.cpp b/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.cpp index e80ff51..00993d7 100644 --- a/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.cpp +++ b/activemq-cpp/src/main/activemq/transport/failover/BackupTransportPool.cpp @@ -53,14 +53,37 @@ namespace failover { public: + BackupTransportPool* pool; + FailoverTransport* parent; LinkedList< Pointer<BackupTransport> > backups; volatile bool pending; volatile bool closed; volatile int priorityBackups; - BackupTransportPoolImpl() : backups(), pending(false), closed(false), priorityBackups(0) { + BackupTransportPoolImpl(BackupTransportPool* pool, FailoverTransport* parent) : pool(pool), + parent(parent), + backups(), + pending(false), + closed(false), + priorityBackups(0) { } + bool shouldBuildBackup() { + bool result = false; + + if (pool->isEnabled()) { + + // If there's no priority backup and the failover transport isn't connected to + // a priority backup then we should keep trying to connect to one. + if (parent->isPriorityBackup() && !parent->isConnectedToPriority() && priorityBackups == 0) { + result = true; + } else if (backups.size() < pool->getBackupPoolSize()) { + result = true; + } + } + + return result; + } }; }}} @@ -101,7 +124,7 @@ BackupTransportPool::BackupTransportPool(FailoverTransport* parent, throw NullPointerException(__FILE__, __LINE__, "Close Transport Task passed is NULL"); } - this->impl = new BackupTransportPoolImpl(); + this->impl = new BackupTransportPoolImpl(this, parent); // Add this instance as a Task so that we can create backups when nothing else is // going on. @@ -145,7 +168,7 @@ BackupTransportPool::BackupTransportPool(FailoverTransport* parent, throw NullPointerException(__FILE__, __LINE__, "Close Transport Task passed is NULL"); } - this->impl = new BackupTransportPoolImpl(); + this->impl = new BackupTransportPoolImpl(this, parent); // Add this instance as a Task so that we can create backups when nothing else is // going on. @@ -242,7 +265,7 @@ bool BackupTransportPool::iterate() { bool wakeupParent = false; - while (isEnabled() && (int) this->impl->backups.size() < backupPoolSize) { + while (impl->shouldBuildBackup()) { URI connectTo; @@ -259,14 +282,6 @@ bool BackupTransportPool::iterate() { Pointer<BackupTransport> backup(new BackupTransport(this)); backup->setUri(connectTo); - if (priorityUriPool->contains(connectTo)) { - backup->setPriority(true); - - if (!parent->isConnectedToPriority()) { - wakeupParent = true; - } - } - try { Pointer<Transport> transport = createTransport(connectTo); @@ -274,6 +289,14 @@ bool BackupTransportPool::iterate() { transport->start(); backup->setTransport(transport); + if (priorityUriPool->contains(connectTo) || (priorityUriPool->isEmpty() && uriPool->isPriority(connectTo))) { + backup->setPriority(true); + + if (!parent->isConnectedToPriority()) { + wakeupParent = true; + } + } + // Put any priority connections first so a reconnect picks them // up automatically. if (backup->isPriority()) { @@ -282,6 +305,7 @@ bool BackupTransportPool::iterate() { } else { this->impl->backups.addLast(backup); } + } catch (...) { // Store it in the list of URIs that didn't work, once done we // return those to the pool. @@ -289,15 +313,16 @@ bool BackupTransportPool::iterate() { } // We connected to a priority backup and the parent isn't already using one - // so wake it up and quick the backups process for now. + // so wake it up and quit the backups process for now. if (wakeupParent) { - this->parent->reconnect(true); + this->parent->reconnect(false); break; } } // return all failures to the URI Pool, we can try again later. uriPool->addURIs(failures); + this->impl->pending = false; } http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/a8ff8b54/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp ---------------------------------------------------------------------- diff --git a/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp b/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp index 9ee7a91..1b61adb 100644 --- a/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp +++ b/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp @@ -71,9 +71,9 @@ namespace failover { public: - bool closed; - bool connected; - bool started; + volatile bool closed; + volatile bool connected; + volatile bool started; long long timeout; long long initialReconnectDelay; @@ -253,7 +253,7 @@ namespace failover { } bool isConnectionStateValid() const { - return connectedTransport != NULL && !doRebalance && !this->backups->isPriorityBackupAvailable(); + return connectedTransport != NULL && !doRebalance && !backups->isPriorityBackupAvailable(); } void disconnect() { @@ -273,6 +273,10 @@ namespace failover { this->uris->addURI(*this->connectedTransportURI); this->connectedTransportURI.reset(NULL); } + + if (transportListener != NULL) { + transportListener->transportInterrupted(); + } } } @@ -853,16 +857,14 @@ void FailoverTransport::updateURIs(bool rebalance, const decaf::util::List<decaf bool FailoverTransport::isPending() const { bool result = false; - synchronized(&this->impl->reconnectMutex) { - if (!this->impl->isConnectionStateValid() && this->impl->started && !this->impl->isClosedOrFailed()) { + synchronized(&impl->reconnectMutex) { + if (!impl->isConnectionStateValid() && impl->started && !impl->isClosedOrFailed()) { - int maxReconnectAttempts = this->impl->calculateReconnectAttemptLimit(); + int maxReconnectAttempts = impl->calculateReconnectAttemptLimit(); if (impl->firstConnection && impl->connectFailures == 0) { - return true; - } - - if (maxReconnectAttempts != -1 && this->impl->connectFailures > maxReconnectAttempts) { + result = true; + } else if (maxReconnectAttempts != -1 && impl->connectFailures > maxReconnectAttempts) { result = false; } else { result = true; @@ -878,7 +880,7 @@ bool FailoverTransport::iterate() { Pointer<Exception> failure; - synchronized( &this->impl->reconnectMutex ) { + synchronized(&this->impl->reconnectMutex) { if (this->impl->isClosedOrFailed()) { this->impl->reconnectMutex.notifyAll(); @@ -890,7 +892,7 @@ bool FailoverTransport::iterate() { Pointer<URIPool> connectList = this->impl->getConnectList(); - if (connectList->isEmpty()) { + if (connectList->isEmpty() && !impl->backups->isEnabled()) { failure.reset(new IOException(__FILE__, __LINE__, "No URIs available for reconnect.")); } else { @@ -939,7 +941,7 @@ bool FailoverTransport::iterate() { } } - while (transport == NULL && this->impl->connectedTransport == NULL && !this->impl->closed) { + while ((transport != NULL || !connectList->isEmpty()) && this->impl->connectedTransport == NULL && !this->impl->closed) { try { // We could be starting the loop with a backup already. if (transport == NULL) { @@ -964,6 +966,7 @@ bool FailoverTransport::iterate() { this->impl->connectedTransport = transport; this->impl->reconnectMutex.notifyAll(); this->impl->connectFailures = 0; + this->impl->connected = true; if (isPriorityBackup()) { this->impl->connectedToPrioirty = connectList->getPriorityURI().equals(uri) || @@ -1012,6 +1015,7 @@ bool FailoverTransport::iterate() { // this prevents a deadlock from occurring if the Transport happens // to call back through our onException method or locks in some other // way. + this->impl->connected = false; this->impl->closeTask->add(transport); this->impl->taskRunner->wakeup(); transport.reset(NULL); http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/a8ff8b54/activemq-cpp/src/main/activemq/transport/failover/URIPool.cpp ---------------------------------------------------------------------- diff --git a/activemq-cpp/src/main/activemq/transport/failover/URIPool.cpp b/activemq-cpp/src/main/activemq/transport/failover/URIPool.cpp index 1c7171a..7c8f947 100644 --- a/activemq-cpp/src/main/activemq/transport/failover/URIPool.cpp +++ b/activemq-cpp/src/main/activemq/transport/failover/URIPool.cpp @@ -161,9 +161,7 @@ bool URIPool::contains(const decaf::net::URI& uri) const { //////////////////////////////////////////////////////////////////////////////// bool URIPool::isPriority(const decaf::net::URI& uri) const { synchronized(&uriPool) { - if (!uriPool.isEmpty()) { - return uriPool.getFirst().equals(uri); - } + return priorityURI.equals(uri); } return false; } http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/a8ff8b54/activemq-cpp/src/test/activemq/mock/MockBrokerService.cpp ---------------------------------------------------------------------- diff --git a/activemq-cpp/src/test/activemq/mock/MockBrokerService.cpp b/activemq-cpp/src/test/activemq/mock/MockBrokerService.cpp index dea1bbc..ceb6634 100644 --- a/activemq-cpp/src/test/activemq/mock/MockBrokerService.cpp +++ b/activemq-cpp/src/test/activemq/mock/MockBrokerService.cpp @@ -55,8 +55,9 @@ namespace mock { class TcpServer : public lang::Thread { private: - bool done; - bool error; + volatile bool done; + volatile bool error; + const int configuredPort; Pointer<ServerSocket> server; Pointer<OpenWireFormat> wireFormat; Pointer<OpenWireResponseBuilder> responeBuilder; @@ -65,9 +66,8 @@ namespace mock { public: - TcpServer() : Thread(), done(false), error(false), server(), wireFormat(), + TcpServer() : Thread(), done(false), error(false), configuredPort(0), server(), wireFormat(), responeBuilder(), started(1), rand() { - server.reset(new ServerSocket(0)); Properties properties; @@ -77,9 +77,8 @@ namespace mock { this->rand.setSeed(System::currentTimeMillis()); } - TcpServer(int port) : Thread(), done(false), error(false), server(), wireFormat(), + TcpServer(int port) : Thread(), done(false), error(false), configuredPort(port), server(), wireFormat(), responeBuilder(), started(1), rand() { - server.reset(new ServerSocket(port)); Properties properties; this->wireFormat = OpenWireFormatFactory().createWireFormat(properties).dynamicCast<OpenWireFormat>(); @@ -90,6 +89,7 @@ namespace mock { virtual ~TcpServer() { stop(); + waitUntilStopped(); } int getLocalPort() { @@ -124,7 +124,15 @@ namespace mock { MockTransport mock(this->wireFormat, this->responeBuilder); - std::auto_ptr<Socket> socket(server->accept()); + server.reset(new ServerSocket(configuredPort)); + + std::auto_ptr<Socket> socket; + try { + socket.reset(server->accept()); + } catch (IOException& ioe) { + continue; + } + socket->setSoLinger(false, 0); Pointer<WireFormatInfo> preferred = wireFormat->getPreferedWireFormatInfo(); @@ -147,7 +155,6 @@ namespace mock { } } } - } catch (IOException& ex) { error = true; } catch (Exception& ex) { @@ -228,6 +235,11 @@ void MockBrokerService::waitUntilStopped() { } //////////////////////////////////////////////////////////////////////////////// +int MockBrokerService::getPort() const { + return this->impl->server->getLocalPort(); +} + +//////////////////////////////////////////////////////////////////////////////// std::string MockBrokerService::getConnectString() const { int port = this->impl->server->getLocalPort(); return std::string("tcp://localhost:") + Integer::toString(port); http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/a8ff8b54/activemq-cpp/src/test/activemq/mock/MockBrokerService.h ---------------------------------------------------------------------- diff --git a/activemq-cpp/src/test/activemq/mock/MockBrokerService.h b/activemq-cpp/src/test/activemq/mock/MockBrokerService.h index 10dfc8a..92f4b76 100644 --- a/activemq-cpp/src/test/activemq/mock/MockBrokerService.h +++ b/activemq-cpp/src/test/activemq/mock/MockBrokerService.h @@ -57,6 +57,8 @@ namespace mock { std::string getConnectString() const; + int getPort() const; + }; }} http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/a8ff8b54/activemq-cpp/src/test/activemq/transport/failover/FailoverTransportTest.cpp ---------------------------------------------------------------------- diff --git a/activemq-cpp/src/test/activemq/transport/failover/FailoverTransportTest.cpp b/activemq-cpp/src/test/activemq/transport/failover/FailoverTransportTest.cpp index 89307f6..1928f0a 100644 --- a/activemq-cpp/src/test/activemq/transport/failover/FailoverTransportTest.cpp +++ b/activemq-cpp/src/test/activemq/transport/failover/FailoverTransportTest.cpp @@ -25,6 +25,8 @@ #include <activemq/commands/WireFormatInfo.h> #include <activemq/commands/ConnectionControl.h> #include <activemq/mock/MockBrokerService.h> +#include <decaf/util/concurrent/CountDownLatch.h> +#include <decaf/util/concurrent/Mutex.h> #include <decaf/lang/Pointer.h> #include <decaf/lang/Thread.h> #include <decaf/util/UUID.h> @@ -39,6 +41,7 @@ using namespace activemq::exceptions; using namespace decaf::io; using namespace decaf::lang; using namespace decaf::util; +using namespace decaf::util::concurrent; //////////////////////////////////////////////////////////////////////////////// FailoverTransportTest::FailoverTransportTest() { @@ -796,3 +799,241 @@ void FailoverTransportTest::testStartupMaxReconnectsHonorsConfiguration() { transport->close(); } + +//////////////////////////////////////////////////////////////////////////////// +namespace { + + class PriorityBackupListener : public DefaultTransportListener { + private: + + Pointer<CountDownLatch> interruptedLatch; + Pointer<CountDownLatch> resumedLatch; + + Mutex resetMutex; + + public: + + PriorityBackupListener() : interruptedLatch(new CountDownLatch(1)), + resumedLatch(new CountDownLatch(1)), + resetMutex() { + } + + virtual ~PriorityBackupListener() {} + + virtual void transportInterrupted() { + interruptedLatch->countDown(); + } + + virtual void transportResumed() { + resumedLatch->countDown(); + } + + void reset() { + synchronized(&resetMutex) { + interruptedLatch.reset(new CountDownLatch(1)); + resumedLatch.reset(new CountDownLatch(1)); + } + } + + bool awaitInterruption() { + synchronized(&resetMutex) { + return interruptedLatch->await(60000); + } + + return false; + } + + bool awaitResumed() { + synchronized(&resetMutex) { + return resumedLatch->await(60000); + } + + return false; + } + }; + +} + +//////////////////////////////////////////////////////////////////////////////// +void FailoverTransportTest::testConnectedToPriorityOnFirstTryThenFailover() { + + Pointer<MockBrokerService> broker1(new MockBrokerService(61626)); + Pointer<MockBrokerService> broker2(new MockBrokerService(61628)); + + broker1->start(); + broker1->waitUntilStarted(); + + broker2->start(); + broker2->waitUntilStarted(); + + std::string uri = "failover://(tcp://localhost:61626," + "tcp://localhost:61628)?randomize=false&priorityBackup=true"; + + PriorityBackupListener listener; + FailoverTransportFactory factory; + + Pointer<Transport> transport(factory.create(uri)); + CPPUNIT_ASSERT(transport != NULL); + transport->setTransportListener(&listener); + + FailoverTransport* failover = + dynamic_cast<FailoverTransport*>(transport->narrow(typeid(FailoverTransport))); + + CPPUNIT_ASSERT(failover != NULL); + CPPUNIT_ASSERT(failover->isRandomize() == false); + CPPUNIT_ASSERT(failover->isPriorityBackup() == true); + + transport->start(); + + CPPUNIT_ASSERT_MESSAGE("Failed to get reconnected in time", listener.awaitResumed()); + listener.reset(); + + CPPUNIT_ASSERT(failover->isConnected() == true); + CPPUNIT_ASSERT(failover->isConnectedToPriority() == true); + + broker1->stop(); + broker1->waitUntilStopped(); + + CPPUNIT_ASSERT_MESSAGE("Failed to get interrupted in time", listener.awaitInterruption()); + CPPUNIT_ASSERT_MESSAGE("Failed to get reconnected in time", listener.awaitResumed()); + listener.reset(); + + CPPUNIT_ASSERT(failover->isConnected() == true); + CPPUNIT_ASSERT(failover->isConnectedToPriority() == false); + + transport->close(); + + broker1->stop(); + broker1->waitUntilStopped(); + + broker2->stop(); + broker2->waitUntilStopped(); +} + +//////////////////////////////////////////////////////////////////////////////// +void FailoverTransportTest::testConnectsToPriorityOnceStarted() { + + Pointer<MockBrokerService> broker1(new MockBrokerService(61626)); + Pointer<MockBrokerService> broker2(new MockBrokerService(61628)); + + broker2->start(); + broker2->waitUntilStarted(); + + std::string uri = "failover://(tcp://localhost:61626?transport.useInactivityMonitor=false," + "tcp://localhost:61628?transport.useInactivityMonitor=false)?randomize=false&priorityBackup=true"; + + PriorityBackupListener listener; + FailoverTransportFactory factory; + + Pointer<Transport> transport(factory.create(uri)); + CPPUNIT_ASSERT(transport != NULL); + transport->setTransportListener(&listener); + + FailoverTransport* failover = + dynamic_cast<FailoverTransport*>(transport->narrow(typeid(FailoverTransport))); + + CPPUNIT_ASSERT(failover != NULL); + CPPUNIT_ASSERT(failover->isRandomize() == false); + CPPUNIT_ASSERT(failover->isPriorityBackup() == true); + + transport->start(); + + CPPUNIT_ASSERT_MESSAGE("Failed to get reconnected in time", listener.awaitResumed()); + listener.reset(); + + CPPUNIT_ASSERT(failover->isConnected() == true); + CPPUNIT_ASSERT(failover->isConnectedToPriority() == false); + + broker1->start(); + broker1->waitUntilStarted(); + + CPPUNIT_ASSERT_MESSAGE("Failed to get interrupted in time", listener.awaitInterruption()); + CPPUNIT_ASSERT_MESSAGE("Failed to get reconnected in time", listener.awaitResumed()); + listener.reset(); + + CPPUNIT_ASSERT(failover->isConnected() == true); + CPPUNIT_ASSERT(failover->isConnectedToPriority() == true); + + transport->close(); + + broker1->stop(); + broker1->waitUntilStopped(); + + broker2->stop(); + broker2->waitUntilStopped(); +} + +//////////////////////////////////////////////////////////////////////////////// +void FailoverTransportTest::testConnectsToPriorityAfterInitialBackupFails() { + + Pointer<MockBrokerService> broker1(new MockBrokerService(61626)); + Pointer<MockBrokerService> broker2(new MockBrokerService(61627)); + Pointer<MockBrokerService> broker3(new MockBrokerService(61628)); + + broker2->start(); + broker2->waitUntilStarted(); + + broker3->start(); + broker3->waitUntilStarted(); + + std::string uri = "failover://(tcp://localhost:61626?transport.useInactivityMonitor=false," + "tcp://localhost:61627?transport.useInactivityMonitor=false," + "tcp://localhost:61628?transport.useInactivityMonitor=false)?randomize=false&priorityBackup=true"; + + PriorityBackupListener listener; + FailoverTransportFactory factory; + + Pointer<Transport> transport(factory.create(uri)); + CPPUNIT_ASSERT(transport != NULL); + transport->setTransportListener(&listener); + + FailoverTransport* failover = + dynamic_cast<FailoverTransport*>(transport->narrow(typeid(FailoverTransport))); + + CPPUNIT_ASSERT(failover != NULL); + CPPUNIT_ASSERT(failover->isRandomize() == false); + CPPUNIT_ASSERT(failover->isPriorityBackup() == true); + + transport->start(); + + CPPUNIT_ASSERT_MESSAGE("Failed to get reconnected in time", listener.awaitResumed()); + listener.reset(); + + CPPUNIT_ASSERT(failover->isConnected() == true); + CPPUNIT_ASSERT(failover->isConnectedToPriority() == false); + + Thread::sleep(100); + + broker1->start(); + broker1->waitUntilStarted(); + + broker2->stop(); + broker2->waitUntilStopped(); + + for (int i = 0; i < 2; ++i) { + + CPPUNIT_ASSERT_MESSAGE("Failed to get interrupted in time", listener.awaitInterruption()); + CPPUNIT_ASSERT_MESSAGE("Failed to get reconnected in time", listener.awaitResumed()); + listener.reset(); + + URI connectedURI = URI(transport->getRemoteAddress()); + + if (connectedURI.getPort() == broker1->getPort()) { + break; + } + } + + CPPUNIT_ASSERT(failover->isConnected() == true); + CPPUNIT_ASSERT(failover->isConnectedToPriority() == true); + + transport->close(); + + broker1->stop(); + broker1->waitUntilStopped(); + + broker2->stop(); + broker2->waitUntilStopped(); + + broker3->stop(); + broker3->waitUntilStopped(); +} http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/a8ff8b54/activemq-cpp/src/test/activemq/transport/failover/FailoverTransportTest.h ---------------------------------------------------------------------- diff --git a/activemq-cpp/src/test/activemq/transport/failover/FailoverTransportTest.h b/activemq-cpp/src/test/activemq/transport/failover/FailoverTransportTest.h index 9afd8a5..74d7813 100644 --- a/activemq-cpp/src/test/activemq/transport/failover/FailoverTransportTest.h +++ b/activemq-cpp/src/test/activemq/transport/failover/FailoverTransportTest.h @@ -57,6 +57,9 @@ namespace failover { CPPUNIT_TEST( testMaxReconnectsZeroAttemptsOneConnect ); CPPUNIT_TEST( testMaxReconnectsHonorsConfiguration ); CPPUNIT_TEST( testStartupMaxReconnectsHonorsConfiguration ); + CPPUNIT_TEST( testConnectedToPriorityOnFirstTryThenFailover ); + CPPUNIT_TEST( testConnectsToPriorityOnceStarted ); + //CPPUNIT_TEST( testConnectsToPriorityAfterInitialBackupFails ); CPPUNIT_TEST_SUITE_END(); public: @@ -81,6 +84,9 @@ namespace failover { void testMaxReconnectsZeroAttemptsOneConnect(); void testMaxReconnectsHonorsConfiguration(); void testStartupMaxReconnectsHonorsConfiguration(); + void testConnectedToPriorityOnFirstTryThenFailover(); + void testConnectsToPriorityOnceStarted(); + void testConnectsToPriorityAfterInitialBackupFails(); private:
