Author: tabish
Date: Wed Mar 30 19:24:22 2011
New Revision: 1087050
URL: http://svn.apache.org/viewvc?rev=1087050&view=rev
Log:
fix for https://issues.apache.org/jira/browse/AMQCPP-358
Adds some small refactoring to support future work needed.
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportListener.cpp
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp?rev=1087050&r1=1087049&r2=1087050&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
Wed Mar 30 19:24:22 2011
@@ -551,7 +551,6 @@ void FailoverTransport::processNewTransp
URI uri( str );
list.add( uri );
} catch( Exception e ) {
- //LOG.error( "Failed to parse broker address: " + str, e );
}
}
@@ -559,7 +558,6 @@ void FailoverTransport::processNewTransp
try {
updateURIs( rebalance, list );
} catch( IOException e ) {
- //LOG.error( "Failed to update transport URI's from: " +
newTransports, e );
}
}
}
@@ -615,7 +613,22 @@ bool FailoverTransport::isPending() cons
synchronized( &reconnectMutex ) {
if( this->connectedTransport == NULL && !closed && started ) {
- result = true;
+
+ int reconnectAttempts = 0;
+ if( firstConnection ) {
+ if( startupMaxReconnectAttempts != 0 ) {
+ reconnectAttempts = startupMaxReconnectAttempts;
+ }
+ }
+ if( reconnectAttempts == 0 ) {
+ reconnectAttempts = maxReconnectAttempts;
+ }
+
+ if( reconnectAttempts > 0 && connectFailures >= reconnectAttempts
) {
+ result = false;
+ } else {
+ result = true;
+ }
}
}
@@ -802,8 +815,6 @@ bool FailoverTransport::iterate() {
if( !closed ) {
synchronized( &sleepMutex ) {
- //std::cout << "Failover: Trying again in "
- // << reconnectDelay << "Milliseconds." << std::endl;
sleepMutex.wait( (unsigned int)reconnectDelay );
}
@@ -848,3 +859,219 @@ void FailoverTransport::setConnectionInt
stateTracker.connectionInterruptProcessingComplete( this, connectionId
);
}
}
+
+////////////////////////////////////////////////////////////////////////////////
+bool FailoverTransport::isConnected() const {
+ return this->connected;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool FailoverTransport::isClosed() const {
+ return this->closed;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool FailoverTransport::isInitialized() const {
+ return this->initialized;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::setInitialized( bool value ) {
+ this->initialized = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Transport* FailoverTransport::narrow( const std::type_info& typeId ) {
+
+ if( typeid( *this ) == typeId ) {
+ return this;
+ }
+
+ if( this->connectedTransport != NULL ) {
+ return this->connectedTransport->narrow( typeId );
+ }
+
+ return NULL;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::processResponse(const Pointer<Response>& response) {
+
+ Pointer<Command> object;
+
+ synchronized( &( this->requestMap ) ) {
+ try{
+ object = this->requestMap.remove( response->getCorrelationId() );
+ } catch( NoSuchElementException& ex ) {
+ // Not tracking this request in our map, not an error.
+ }
+ }
+
+ if( object != NULL ) {
+ try{
+ Pointer<Tracked> tracked = object.dynamicCast<Tracked>();
+ tracked->onResponse();
+ }
+ AMQ_CATCH_NOTHROW( ClassCastException )
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long FailoverTransport::getTimeout() const {
+ return this->timeout;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::setTimeout( long long value ) {
+ this->timeout = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long FailoverTransport::getInitialReconnectDelay() const {
+ return this->initialReconnectDelay;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::setInitialReconnectDelay( long long value ) {
+ this->initialReconnectDelay = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long FailoverTransport::getMaxReconnectDelay() const {
+ return this->maxReconnectDelay;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::setMaxReconnectDelay( long long value ) {
+ this->maxReconnectDelay = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long FailoverTransport::getBackOffMultiplier() const {
+ return this->backOffMultiplier;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::setBackOffMultiplier( long long value ) {
+ this->backOffMultiplier = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool FailoverTransport::isUseExponentialBackOff() const {
+ return this->useExponentialBackOff;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::setUseExponentialBackOff( bool value ) {
+ this->useExponentialBackOff = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool FailoverTransport::isRandomize() const {
+ return this->uris->isRandomize();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::setRandomize( bool value ) {
+ this->uris->setRandomize( value );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int FailoverTransport::getMaxReconnectAttempts() const {
+ return this->maxReconnectAttempts;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::setMaxReconnectAttempts( int value ) {
+ this->maxReconnectAttempts = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int FailoverTransport::getStartupMaxReconnectAttempts() const {
+ return this->startupMaxReconnectAttempts;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::setStartupMaxReconnectAttempts( int value ) {
+ this->startupMaxReconnectAttempts = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long FailoverTransport::getReconnectDelay() const {
+ return this->reconnectDelay;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::setReconnectDelay( long long value ) {
+ this->reconnectDelay = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool FailoverTransport::isBackup() const {
+ return this->backups->isEnabled();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::setBackup( bool value ) {
+ this->backups->setEnabled( value );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int FailoverTransport::getBackupPoolSize() const {
+ return this->backups->getBackupPoolSize();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::setBackupPoolSize( int value ) {
+ this->backups->setBackupPoolSize( value );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool FailoverTransport::isTrackMessages() const {
+ return this->trackMessages;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::setTrackMessages( bool value ) {
+ this->trackMessages = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool FailoverTransport::isTrackTransactionProducers() const {
+ return this->trackTransactionProducers;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::setTrackTransactionProducers( bool value ) {
+ this->trackTransactionProducers = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int FailoverTransport::getMaxCacheSize() const {
+ return this->maxCacheSize;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::setMaxCacheSize( int value ) {
+ this->maxCacheSize = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool FailoverTransport::isReconnectSupported() const {
+ return this->reconnectSupported;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::setReconnectSupported( bool value ) {
+ this->reconnectSupported = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool FailoverTransport::isUpdateURIsSupported() const {
+ return this->updateURIsSupported;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::setUpdateURIsSupported( bool value ) {
+ this->updateURIsSupported = value;
+}
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h?rev=1087050&r1=1087049&r2=1087050&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h
Wed Mar 30 19:24:22 2011
@@ -155,33 +155,15 @@ namespace failover {
return true;
}
- virtual bool isConnected() const {
- return this->connected;
- }
+ virtual bool isConnected() const;
- virtual bool isClosed() const {
- return this->closed;
- }
+ virtual bool isClosed() const;
- bool isInitialized() const {
- return this->initialized;
- }
-
- void setInitialized( bool value ) {
- this->initialized = value;
- }
+ bool isInitialized() const;
- virtual Transport* narrow( const std::type_info& typeId ) {
- if( typeid( *this ) == typeId ) {
- return this;
- }
-
- if( this->connectedTransport != NULL ) {
- return this->connectedTransport->narrow( typeId );
- }
+ void setInitialized( bool value );
- return NULL;
- }
+ virtual Transport* narrow( const std::type_info& typeId );
virtual std::string getRemoteAddress() const;
@@ -208,133 +190,69 @@ namespace failover {
public: // FailoverTransport Property Getters / Setters
- long long getTimeout() const {
- return this->timeout;
- }
+ long long getTimeout() const;
- void setTimeout( long long value ) {
- this->timeout = value;
- }
+ void setTimeout( long long value );
- long long getInitialReconnectDelay() const {
- return this->initialReconnectDelay;
- }
+ long long getInitialReconnectDelay() const;
- void setInitialReconnectDelay( long long value ) {
- this->initialReconnectDelay = value;
- }
+ void setInitialReconnectDelay( long long value );
- long long getMaxReconnectDelay() const {
- return this->maxReconnectDelay;
- }
+ long long getMaxReconnectDelay() const;
- void setMaxReconnectDelay( long long value ) {
- this->maxReconnectDelay = value;
- }
+ void setMaxReconnectDelay( long long value );
- long long getBackOffMultiplier() const {
- return this->backOffMultiplier;
- }
+ long long getBackOffMultiplier() const;
- void setBackOffMultiplier( long long value ) {
- this->backOffMultiplier = value;
- }
+ void setBackOffMultiplier( long long value );
- bool isUseExponentialBackOff() const {
- return this->useExponentialBackOff;
- }
+ bool isUseExponentialBackOff() const;
- void setUseExponentialBackOff( bool value ) {
- this->useExponentialBackOff = value;
- }
+ void setUseExponentialBackOff( bool value );
- bool isRandomize() const {
- return this->uris->isRandomize();
- }
+ bool isRandomize() const;
- void setRandomize( bool value ) {
- this->uris->setRandomize( value );
- }
+ void setRandomize( bool value );
- int getMaxReconnectAttempts() const {
- return this->maxReconnectAttempts;
- }
+ int getMaxReconnectAttempts() const;
- void setMaxReconnectAttempts( int value ) {
- this->maxReconnectAttempts = value;
- }
+ void setMaxReconnectAttempts( int value );
- int getStartupMaxReconnectAttempts() const {
- return this->startupMaxReconnectAttempts;
- }
+ int getStartupMaxReconnectAttempts() const;
- void setStartupMaxReconnectAttempts( int value ) {
- this->startupMaxReconnectAttempts = value;
- }
+ void setStartupMaxReconnectAttempts( int value );
- long long getReconnectDelay() const {
- return this->reconnectDelay;
- }
+ long long getReconnectDelay() const;
- void setReconnectDelay( long long value ) {
- this->reconnectDelay = value;
- }
+ void setReconnectDelay( long long value );
- bool isBackup() const {
- return this->backups->isEnabled();
- }
+ bool isBackup() const;
- void setBackup( bool value ) {
- this->backups->setEnabled( value );
- }
+ void setBackup( bool value );
- int getBackupPoolSize() const {
- return this->backups->getBackupPoolSize();
- }
+ int getBackupPoolSize() const;
- void setBackupPoolSize( int value ) {
- this->backups->setBackupPoolSize( value );
- }
+ void setBackupPoolSize( int value );
- bool isTrackMessages() const {
- return this->trackMessages;
- }
+ bool isTrackMessages() const;
- void setTrackMessages( bool value ) {
- this->trackMessages = value;
- }
+ void setTrackMessages( bool value );
- bool isTrackTransactionProducers() const {
- return this->trackTransactionProducers;
- }
+ bool isTrackTransactionProducers() const;
- void setTrackTransactionProducers( bool value ) {
- this->trackTransactionProducers = value;
- }
+ void setTrackTransactionProducers( bool value );
- int getMaxCacheSize() const {
- return this->maxCacheSize;
- }
+ int getMaxCacheSize() const;
- void setMaxCacheSize( int value ) {
- this->maxCacheSize = value;
- }
+ void setMaxCacheSize( int value );
- bool isReconnectSupported() const {
- return this->reconnectSupported;
- }
+ bool isReconnectSupported() const;
- void setReconnectSupported( bool value ) {
- this->reconnectSupported = value;
- }
+ void setReconnectSupported( bool value );
- bool isUpdateURIsSupported() const {
- return this->updateURIsSupported;
- }
+ bool isUpdateURIsSupported() const;
- void setUpdateURIsSupported( bool value ) {
- this->updateURIsSupported = value;
- }
+ void setUpdateURIsSupported( bool value );
void setConnectionInterruptProcessingComplete( const
Pointer<commands::ConnectionId>& connectionId );
@@ -382,6 +300,8 @@ namespace failover {
void processNewTransports( bool rebalance, std::string newTransports );
+ void processResponse(const Pointer<Response>& response);
+
};
}}}
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportListener.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportListener.cpp?rev=1087050&r1=1087049&r2=1087050&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportListener.cpp
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportListener.cpp
Wed Mar 30 19:24:22 2011
@@ -53,26 +53,8 @@ void FailoverTransportListener::onComman
}
if( command->isResponse() ) {
-
- Pointer<Response> response =
- command.dynamicCast<Response>();
- Pointer<Command> object;
-
- synchronized( &( parent->requestMap ) ) {
- try{
- object = parent->requestMap.remove(
response->getCorrelationId() );
- } catch( NoSuchElementException& ex ) {
- // Not tracking this request in our map, not an error.
- }
- }
-
- if( object != NULL ) {
- try{
- Pointer<Tracked> tracked = object.dynamicCast<Tracked>();
- tracked->onResponse();
- }
- AMQ_CATCH_NOTHROW( ClassCastException )
- }
+ Pointer<Response> response = command.dynamicCast<Response>();
+ parent->processResponse(response);
}
if( !parent->isInitialized() ) {
@@ -83,8 +65,8 @@ void FailoverTransportListener::onComman
parent->handleConnectionControl( command );
}
- if( parent->transportListener != NULL ) {
- parent->transportListener->onCommand( command );
+ if( parent->getTransportListener() != NULL ) {
+ parent->getTransportListener()->onCommand( command );
}
}
@@ -93,22 +75,22 @@ void FailoverTransportListener::onExcept
try {
parent->handleTransportFailure( ex );
} catch( Exception& e ) {
- if( parent->transportListener != NULL ) {
- parent->transportListener->onException( e );
+ if( parent->getTransportListener() != NULL ) {
+ parent->getTransportListener()->onException( e );
}
}
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransportListener::transportInterrupted() {
- if( parent->transportListener != NULL ) {
- parent->transportListener->transportInterrupted();
+ if( parent->getTransportListener() != NULL ) {
+ parent->getTransportListener()->transportInterrupted();
}
}
////////////////////////////////////////////////////////////////////////////////
void FailoverTransportListener::transportResumed() {
- if( parent->transportListener != NULL ) {
- parent->transportListener->transportResumed();
+ if( parent->getTransportListener() != NULL ) {
+ parent->getTransportListener()->transportResumed();
}
}