Author: tabish
Date: Fri Feb 5 00:45:07 2010
New Revision: 906751
URL: http://svn.apache.org/viewvc?rev=906751&view=rev
Log:
Updates to the State Tracker and Failover code to improve recovery.
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ConsumerControl.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ConsumerControl.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionState.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionState.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ProducerState.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ProducerState.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/SessionState.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/SessionState.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/TransactionState.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/TransactionState.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.cpp
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ConsumerControl.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ConsumerControl.cpp?rev=906751&r1=906750&r2=906751&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ConsumerControl.cpp
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ConsumerControl.cpp
Fri Feb 5 00:45:07 2010
@@ -80,6 +80,7 @@
// Copy the data of the base class or classes
BaseCommand::copyDataStructure( src );
+ this->setDestination( srcPtr->getDestination() );
this->setClose( srcPtr->isClose() );
this->setConsumerId( srcPtr->getConsumerId() );
this->setPrefetch( srcPtr->getPrefetch() );
@@ -102,6 +103,13 @@
<< "commandId = " << this->getCommandId() << ", "
<< "responseRequired = " << boolalpha << this->isResponseRequired();
stream << ", ";
+ stream << "Destination = ";
+ if( this->getDestination() != NULL ) {
+ stream << this->getDestination()->toString();
+ } else {
+ stream << "NULL";
+ }
+ stream << ", ";
stream << "Close = " << this->isClose();
stream << ", ";
stream << "ConsumerId = ";
@@ -136,6 +144,13 @@
return false;
}
+ if( this->getDestination() != NULL ) {
+ if( !this->getDestination()->equals( valuePtr->getDestination().get()
) ) {
+ return false;
+ }
+ } else if( valuePtr->getDestination() != NULL ) {
+ return false;
+ }
if( this->isClose() != valuePtr->isClose() ) {
return false;
}
@@ -165,6 +180,21 @@
}
////////////////////////////////////////////////////////////////////////////////
+const decaf::lang::Pointer<ActiveMQDestination>&
ConsumerControl::getDestination() const {
+ return destination;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+decaf::lang::Pointer<ActiveMQDestination>& ConsumerControl::getDestination() {
+ return destination;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ConsumerControl::setDestination( const
decaf::lang::Pointer<ActiveMQDestination>& destination ) {
+ this->destination = destination;
+}
+
+////////////////////////////////////////////////////////////////////////////////
bool ConsumerControl::isClose() const {
return close;
}
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ConsumerControl.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ConsumerControl.h?rev=906751&r1=906750&r2=906751&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ConsumerControl.h
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ConsumerControl.h
Fri Feb 5 00:45:07 2010
@@ -23,6 +23,7 @@
#pragma warning( disable : 4290 )
#endif
+#include <activemq/commands/ActiveMQDestination.h>
#include <activemq/commands/BaseCommand.h>
#include <activemq/commands/ConsumerId.h>
#include <activemq/util/Config.h>
@@ -47,6 +48,7 @@
class AMQCPP_API ConsumerControl : public BaseCommand {
protected:
+ Pointer<ActiveMQDestination> destination;
bool close;
Pointer<ConsumerId> consumerId;
int prefetch;
@@ -105,6 +107,10 @@
*/
virtual bool equals( const DataStructure* value ) const;
+ virtual const Pointer<ActiveMQDestination>& getDestination() const;
+ virtual Pointer<ActiveMQDestination>& getDestination();
+ virtual void setDestination( const Pointer<ActiveMQDestination>&
destination );
+
virtual bool isClose() const;
virtual void setClose( bool close );
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=906751&r1=906750&r2=906751&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
Fri Feb 5 00:45:07 2010
@@ -25,10 +25,13 @@
#include <activemq/exceptions/ActiveMQException.h>
#include <activemq/exceptions/BrokerException.h>
#include <activemq/util/CMSExceptionSupport.h>
+#include <activemq/transport/failover/FailoverTransport.h>
+#include <decaf/lang/Math.h>
#include <decaf/lang/Boolean.h>
#include <decaf/util/Iterator.h>
#include <decaf/util/UUID.h>
+#include <decaf/util/concurrent/TimeUnit.h>
#include <activemq/commands/Command.h>
#include <activemq/commands/ActiveMQMessage.h>
@@ -56,9 +59,12 @@
using namespace activemq::core;
using namespace activemq::commands;
using namespace activemq::exceptions;
+using namespace activemq::transport;
+using namespace activemq::transport::failover;
using namespace decaf;
using namespace decaf::io;
using namespace decaf::util;
+using namespace decaf::util::concurrent;
using namespace decaf::lang;
using namespace decaf::lang::exceptions;
@@ -224,7 +230,12 @@
return;
}
- // Indicates we are on the way out to squelch any exceptions getting
+ // If we are running lets stop first.
+ if( !this->transportFailed.get() ) {
+ this->stop();
+ }
+
+ // Indicates we are on the way out to suppress any exceptions getting
// passed on from the transport as it goes down.
this->closing.set( true );
@@ -234,18 +245,23 @@
allSessions = activeSessions.toArray();
}
+ long long lastDeliveredSequenceId = 0;
+
// Close all of the resources.
for( unsigned int ix=0; ix<allSessions.size(); ++ix ){
- cms::Session* session = allSessions[ix];
+ ActiveMQSession* session = allSessions[ix];
try{
session->close();
+
+ lastDeliveredSequenceId =
+ Math::max( lastDeliveredSequenceId,
session->getLastDeliveredSequenceId() );
} catch( cms::CMSException& ex ){
/* Absorb */
}
}
// Now inform the Broker we are shutting down.
- this->disconnect();
+ this->disconnect( lastDeliveredSequenceId );
// Once current deliveries are done this stops the delivery
// of any new messages.
@@ -261,15 +277,16 @@
enforceConnected();
- // This starts or restarts the delivery of all incomming messages
+ // This starts or restarts the delivery of all incoming messages
// messages delivered while this connection is stopped are dropped
// and not acknowledged.
- this->started.set( true );
+ if( this->started.compareAndSet( false, true ) ) {
- // Start all the sessions.
- std::vector<ActiveMQSession*> sessions = activeSessions.toArray();
- for( unsigned int ix=0; ix<sessions.size(); ++ix ) {
- sessions[ix]->start();
+ // Start all the sessions.
+ std::vector<ActiveMQSession*> sessions = activeSessions.toArray();
+ for( unsigned int ix=0; ix<sessions.size(); ++ix ) {
+ sessions[ix]->start();
+ }
}
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
@@ -284,12 +301,12 @@
// Once current deliveries are done this stops the delivery of any
// new messages.
- this->started.set( false );
-
- std::auto_ptr< Iterator<ActiveMQSession*> > iter(
activeSessions.iterator() );
+ if( this->started.compareAndSet( true, false ) ) {
+ std::auto_ptr< Iterator<ActiveMQSession*> > iter(
activeSessions.iterator() );
- while( iter->hasNext() ){
- iter->next()->stop();
+ while( iter->hasNext() ){
+ iter->next()->stop();
+ }
}
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
@@ -329,13 +346,14 @@
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::disconnect() throw (
activemq::exceptions::ActiveMQException ) {
+void ActiveMQConnection::disconnect( long long lastDeliveredSequenceId )
+ throw ( activemq::exceptions::ActiveMQException ) {
try{
// Remove our ConnectionId from the Broker
- Pointer<RemoveInfo> command( new RemoveInfo() );
- command->setObjectId( this->connectionInfo->getConnectionId() );
+ Pointer<RemoveInfo> command(
this->connectionInfo->createRemoveCommand() );
+ command->setLastDeliveredSequenceId( lastDeliveredSequenceId );
this->syncRequest( command, this->getCloseTimeout() );
// Send the disconnect command to the broker.
@@ -440,6 +458,9 @@
Pointer<MessageDispatch> dispatch =
command.dynamicCast<MessageDispatch>();
+ // Check first to see if we are recovering.
+ waitForTransportInterruptionProcessingToComplete();
+
// Check for an empty Message, shouldn't ever happen but who knows.
if( dispatch->getMessage() == NULL ) {
throw ActiveMQException(
@@ -527,6 +548,9 @@
return;
}
+ // Mark this Connection as having a Failed transport.
+ this->transportFailed.set( true );
+
// Inform the user of the error.
fire( exceptions::ActiveMQException( ex ) );
@@ -548,6 +572,8 @@
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConnection::transportInterrupted() {
+ transportInterruptionProcessingComplete.reset( new CountDownLatch(
dispatchers.size() ) );
+
synchronized( &activeSessions ) {
std::auto_ptr< Iterator<ActiveMQSession*> > iter(
this->activeSessions.iterator() );
@@ -694,3 +720,58 @@
transportListeners.remove( transportListener );
}
}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::waitForTransportInterruptionProcessingToComplete()
+ throw( decaf::lang::exceptions::InterruptedException ) {
+
+ if( transportInterruptionProcessingComplete != NULL ) {
+
+ while( !closed.get() && !transportFailed.get() &&
+ !transportInterruptionProcessingComplete->await( 15,
TimeUnit::SECONDS) ) {
+
+ //LOG.warn( "dispatch paused, waiting for outstanding dispatch
interruption processing (" +
+ // transportInterruptionProcessingComplete.getCount() +
") to complete..");
+ }
+
+ signalInterruptionProcessingComplete();
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setTransportInterruptionProcessingComplete() {
+
+ synchronized( &mutex ) {
+
+ if( transportInterruptionProcessingComplete != NULL ) {
+ transportInterruptionProcessingComplete->countDown();
+
+ try {
+ signalInterruptionProcessingComplete();
+ } catch( InterruptedException& ignored ) {}
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::signalInterruptionProcessingComplete()
+ throw( decaf::lang::exceptions::InterruptedException ) {
+
+ if( transportInterruptionProcessingComplete->await( 0, TimeUnit::SECONDS )
) {
+ synchronized( &mutex ) {
+
+ transportInterruptionProcessingComplete.reset( NULL );
+ FailoverTransport* failoverTransport =
+ dynamic_cast<FailoverTransport*>( this->getTransport().narrow(
typeid( FailoverTransport ) ) );
+
+ if( failoverTransport != NULL ) {
+ failoverTransport->setConnectionInterruptProcessingComplete(
+ this->connectionInfo->getConnectionId() );
+
+ //if( LOG.isDebugEnabled() ) {
+ // LOG.debug("transportInterruptionProcessingComplete for:
" + this.getConnectionInfo().getConnectionId());
+ //}
+ }
+ }
+ }
+}
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h?rev=906751&r1=906750&r2=906751&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
Fri Feb 5 00:45:07 2010
@@ -35,6 +35,7 @@
#include <decaf/util/Properties.h>
#include <decaf/util/StlMap.h>
#include <decaf/util/StlSet.h>
+#include <decaf/util/concurrent/CountDownLatch.h>
#include <decaf/util/concurrent/atomic/AtomicBoolean.h>
#include <decaf/lang/exceptions/UnsupportedOperationException.h>
#include <decaf/lang/exceptions/NullPointerException.h>
@@ -48,6 +49,7 @@
using decaf::lang::Pointer;
using decaf::util::concurrent::atomic::AtomicBoolean;
+ using decaf::util::concurrent::CountDownLatch;
class ActiveMQSession;
class ActiveMQProducer;
@@ -104,6 +106,11 @@
AtomicBoolean closing;
/**
+ * Indicates that this connection's Transport has failed.
+ */
+ AtomicBoolean transportFailed;
+
+ /**
* Map of message dispatchers indexed by consumer id.
*/
DispatcherMap dispatchers;
@@ -138,6 +145,12 @@
*/
Pointer<commands::WireFormatInfo> brokerWireFormatInfo;
+ /**
+ * Latch used to track completion of recovery of consumers
+ * after a Connection Interrupted event.
+ */
+ Pointer<CountDownLatch> transportInterruptionProcessingComplete;
+
public:
/**
@@ -217,6 +230,14 @@
}
/**
+ * Checks if the Connection's Transport has failed
+ * @return true if the Connection's Transport has failed.
+ */
+ bool isTransportFailed() const {
+ return this->transportFailed.get();
+ }
+
+ /**
* Requests that the Broker removes the given Destination. Calling
this
* method implies that the client is finished with the Destination and
that
* no other messages will be sent or received for the given
Destination. The
@@ -427,17 +448,31 @@
*/
virtual void fire( const exceptions::ActiveMQException& ex );
+ /**
+ * Indicates that a Connection resource that is processing the
transportInterrupted
+ * event has completed.
+ */
+ void setTransportInterruptionProcessingComplete();
+
private:
// Sends the connect message to the broker and waits for the response.
void connect() throw ( activemq::exceptions::ActiveMQException );
// Sends a oneway disconnect message to the broker.
- void disconnect() throw ( activemq::exceptions::ActiveMQException );
+ void disconnect( long long lastDeliveredSequenceId ) throw (
activemq::exceptions::ActiveMQException );
// Check for Connected State and Throw an exception if not.
void enforceConnected() const throw (
activemq::exceptions::ActiveMQException );
+ // Waits for all Consumers to handle the Transport Interrupted event.
+ void waitForTransportInterruptionProcessingToComplete()
+ throw ( decaf::lang::exceptions::InterruptedException );
+
+ // Marks processing complete for a single caller when interruption
processing completes.
+ void signalInterruptionProcessingComplete()
+ throw ( decaf::lang::exceptions::InterruptedException );
+
};
}}
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionState.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionState.cpp?rev=906751&r1=906750&r2=906751&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionState.cpp
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionState.cpp
Fri Feb 5 00:45:07 2010
@@ -25,6 +25,7 @@
////////////////////////////////////////////////////////////////////////////////
ConnectionState::ConnectionState( const Pointer<ConnectionInfo>& info ) :
disposed( false ) {
+ this->connectionInterruptProcessingComplete = true;
this->info = info;
}
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionState.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionState.h?rev=906751&r1=906750&r2=906751&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionState.h
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionState.h
Fri Feb 5 00:45:07 2010
@@ -30,6 +30,7 @@
#include <activemq/state/SessionState.h>
#include <activemq/state/TransactionState.h>
+#include <decaf/util/StlMap.h>
#include <decaf/util/concurrent/atomic/AtomicBoolean.h>
#include <decaf/util/concurrent/ConcurrentStlMap.h>
#include <decaf/util/StlList.h>
@@ -58,6 +59,11 @@
StlList< Pointer<DestinationInfo> > tempDestinations;
decaf::util::concurrent::atomic::AtomicBoolean disposed;
+ bool connectionInterruptProcessingComplete;
+ StlMap< Pointer<ConsumerId>,
+ Pointer<ConsumerInfo>,
+ ConsumerId::COMPARATOR > recoveringPullConsumers;
+
public:
ConnectionState( const Pointer<ConnectionInfo>& info );
@@ -133,6 +139,18 @@
return sessions.values();
}
+ StlMap< Pointer<ConsumerId>, Pointer<ConsumerInfo>,
ConsumerId::COMPARATOR > getRecoveringPullConsumers() {
+ return recoveringPullConsumers;
+ }
+
+ void setConnectionInterruptProcessingComplete(bool
connectionInterruptProcessingComplete) {
+ this->connectionInterruptProcessingComplete =
connectionInterruptProcessingComplete;
+ }
+
+ bool isConnectionInterruptProcessingComplete() {
+ return this->connectionInterruptProcessingComplete;
+ }
+
};
}}
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp?rev=906751&r1=906750&r2=906751&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.cpp
Fri Feb 5 00:45:07 2010
@@ -20,7 +20,13 @@
#include <decaf/lang/Runnable.h>
#include <decaf/lang/exceptions/NoSuchElementException.h>
+#include <activemq/commands/ConsumerControl.h>
+#include <activemq/commands/RemoveInfo.h>
+#include <activemq/core/ActiveMQConstants.h>
+#include <activemq/transport/TransportListener.h>
+
using namespace activemq;
+using namespace activemq::core;
using namespace activemq::state;
using namespace activemq::commands;
using namespace activemq::exceptions;
@@ -30,8 +36,6 @@
using namespace decaf::lang::exceptions;
////////////////////////////////////////////////////////////////////////////////
-
-////////////////////////////////////////////////////////////////////////////////
namespace activemq {
namespace state {
@@ -67,6 +71,7 @@
this->restoreProducers = true;
this->restoreTransaction = true;
this->trackMessages = true;
+ this->trackTransactionProducers = true;
this->maxCacheSize = 128 * 1024;
this->currentCacheSize = 0;
}
@@ -153,6 +158,8 @@
try{
+ std::vector< Pointer<Command> > toIgnore;
+
// Restore the session's transaction state
std::vector< Pointer<TransactionState> > transactionStates =
connectionState->getTransactionStates();
@@ -160,14 +167,60 @@
std::vector< Pointer<TransactionState> >::const_iterator iter =
transactionStates.begin();
for( ; iter != transactionStates.end(); ++iter ) {
- Pointer<TransactionState> state = *iter;
+
+ //if( LOG.isDebugEnabled() ) {
+ // LOG.debug("tx: " + transactionState.getId());
+ //}
+
+ // ignore any empty (ack) transaction
+ if( (*iter)->getCommands().size() == 2 ) {
+ Pointer<Command> lastCommand = (*iter)->getCommands().get(1);
+ if( lastCommand->isTransactionInfo() ) {
+ Pointer<TransactionInfo> transactionInfo =
lastCommand.dynamicCast<TransactionInfo>();
+
+ if( transactionInfo->getType() ==
ActiveMQConstants::TRANSACTION_STATE_COMMITONEPHASE ) {
+ //if( LOG.isDebugEnabled() ) {
+ // LOG.debug("not replaying empty (ack) tx: " +
transactionState.getId());
+ //}
+ toIgnore.push_back(lastCommand);
+ continue;
+ }
+ }
+ }
+
+ // replay short lived producers that may have been involved in the
transaction
+ std::vector< Pointer<ProducerState> > producerStates =
(*iter)->getProducerStates();
+ std::vector< Pointer<ProducerState> >::const_iterator state =
producerStates.begin();
+
+ for( ; state != producerStates.end(); ++state ) {
+ //if( LOG.isDebugEnabled() ) {
+ // LOG.debug("tx replay producer :" +
producerState.getInfo());
+ //}
+ transport->oneway( (*state)->getInfo() );
+ }
std::auto_ptr< Iterator< Pointer<Command> > > commands(
- state->getCommands().iterator() );
+ (*iter)->getCommands().iterator() );
while( commands->hasNext() ) {
transport->oneway( commands->next() );
}
+
+ state = producerStates.begin();
+ for( ; state != producerStates.end(); ++state ) {
+ //if( LOG.isDebugEnabled() ) {
+ // LOG.debug("tx remove replayed producer :" +
producerState.getInfo());
+ //}
+ transport->oneway( (*state)->getInfo()->createRemoveCommand()
);
+ }
+ }
+
+ std::vector< Pointer<Command> >::const_iterator command =
toIgnore.begin();
+ for( ; command != toIgnore.end(); ++command ) {
+ // respond to the outstanding commit
+ Pointer<Response> response( new Response() );
+ response->setCorrelationId( (*command)->getCommandId() );
+ transport->getTransportListener()->onCommand( response );
}
}
AMQ_CATCH_RETHROW( IOException )
@@ -212,14 +265,27 @@
try{
- // Restore the session's consumers
+ // Restore the session's consumers but possibly in pull only (prefetch
0 state) till recovery complete
+ Pointer<ConnectionState> connectionState =
+ connectionStates.get(
sessionState->getInfo()->getSessionId()->getParentId() );
+ bool connectionInterruptionProcessingComplete =
+ connectionState->isConnectionInterruptProcessingComplete();
+
std::vector< Pointer<ConsumerState> > consumerStates =
sessionState->getConsumerStates();
+ std::vector< Pointer<ConsumerState> >::const_iterator state =
consumerStates.begin();
- std::vector< Pointer<ConsumerState> >::const_iterator iter =
consumerStates.begin();
+ for( ; state != consumerStates.end(); ++state ) {
- for( ; iter != consumerStates.end(); ++iter ) {
- Pointer<ConsumerState> state = *iter;
- transport->oneway( state->getInfo() );
+ Pointer<ConsumerInfo> infoToSend = (*state)->getInfo();
+
+ if( !connectionInterruptionProcessingComplete &&
infoToSend->getPrefetchSize() > 0) {
+
+ infoToSend.reset( (*state)->getInfo()->cloneDataStructure() );
+ connectionState->getRecoveringPullConsumers().put(
infoToSend->getConsumerId(), (*state)->getInfo() );
+ infoToSend->setPrefetchSize(0);
+ }
+
+ transport->oneway( infoToSend );
}
}
AMQ_CATCH_RETHROW( IOException )
@@ -501,10 +567,12 @@
throw ( activemq::exceptions::ActiveMQException ) {
try{
+
if( message != NULL ) {
if( trackTransactions && message->getTransactionId() != NULL ) {
- Pointer<ConnectionId> connectionId =
- message->getProducerId()->getParentId()->getParentId();
+ Pointer<ProducerId> producerId = message->getProducerId();
+ Pointer<ConnectionId> connectionId =
producerId->getParentId()->getParentId();
+
if( connectionId != NULL ) {
Pointer<ConnectionState> cs = connectionStates.get(
connectionId );
if( cs != NULL ) {
@@ -513,6 +581,13 @@
if( transactionState != NULL ) {
transactionState->addCommand(
Pointer<Command>(
message->cloneDataStructure() ) );
+
+ if( trackTransactionProducers ) {
+ // Track the producer in case it is closed
before a commit
+ Pointer<SessionState> sessionState =
cs->getSessionState( producerId->getParentId() );
+ Pointer<ProducerState> producerState =
sessionState->getProducerState(producerId);
+
producerState->setTransactionState(transactionState);
+ }
}
}
}
@@ -740,3 +815,55 @@
AMQ_CATCHALL_THROW( ActiveMQException )
}
+////////////////////////////////////////////////////////////////////////////////
+void ConnectionStateTracker::connectionInterruptProcessingComplete(
+ transport::Transport* transport, const Pointer<ConnectionId>& connectionId
) {
+
+ Pointer<ConnectionState> connectionState = connectionStates.get(
connectionId );
+
+ if( connectionState != NULL ) {
+
+ connectionState->setConnectionInterruptProcessingComplete( true );
+
+ StlMap< Pointer<ConsumerId>, Pointer<ConsumerInfo>,
ConsumerId::COMPARATOR > stalledConsumers =
+ connectionState->getRecoveringPullConsumers();
+
+ std::vector< Pointer<ConsumerId> > keySet = stalledConsumers.keySet();
+ std::vector< Pointer<ConsumerId> >::const_iterator key =
keySet.begin();
+
+ for( ; key != keySet.end(); ++key ) {
+ Pointer<ConsumerControl> control( new ConsumerControl() );
+
+ control->setConsumerId( *key );
+ control->setPrefetch( stalledConsumers.get( *key
)->getPrefetchSize() );
+ control->setDestination( stalledConsumers.get( *key
)->getDestination() );
+
+ try {
+
+ //if( LOG.isDebugEnabled() ) {
+ // LOG.debug("restored recovering consumer: " +
control.getConsumerId() +
+ // " with: " + control.getPrefetch());
+ //}
+ transport->oneway( control );
+ } catch( Exception& ex ) {
+ //if( LOG.isDebugEnabled() ) {
+ // LOG.debug("Failed to submit control for consumer: " +
control.getConsumerId() +
+ // " with: " + control.getPrefetch(), ex);
+ //}
+ }
+ }
+
+ stalledConsumers.clear();
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ConnectionStateTracker::transportInterrupted() {
+
+ std::vector< Pointer<ConnectionState> > connectionStatesVec =
this->connectionStates.values();
+ std::vector< Pointer<ConnectionState> >::const_iterator state =
connectionStatesVec.begin();
+
+ for( ; state != connectionStatesVec.end(); ++state ) {
+ (*state)->setConnectionInterruptProcessingComplete( false );
+ }
+}
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.h?rev=906751&r1=906750&r2=906751&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.h
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionStateTracker.h
Fri Feb 5 00:45:07 2010
@@ -63,6 +63,7 @@
bool restoreProducers;
bool restoreTransaction;
bool trackMessages;
+ bool trackTransactionProducers;
int maxCacheSize;
int currentCacheSize;
@@ -81,6 +82,11 @@
void restore( const Pointer<transport::Transport>& transport )
throw( decaf::io::IOException );
+ void connectionInterruptProcessingComplete(
+ transport::Transport* transport, const Pointer<ConnectionId>&
connectionId );
+
+ void transportInterrupted();
+
virtual Pointer<Command> processDestinationInfo( DestinationInfo* info
)
throw ( exceptions::ActiveMQException );
@@ -191,6 +197,14 @@
this->maxCacheSize = maxCacheSize;
}
+ bool isTrackTransactionProducers() const {
+ return this->trackTransactionProducers;
+ }
+
+ void setTrackTransactionProducers( bool trackTransactionProducers ) {
+ this->trackTransactionProducers = trackTransactionProducers;
+ }
+
private:
void doRestoreTransactions( const Pointer<transport::Transport>&
transport,
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ProducerState.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ProducerState.cpp?rev=906751&r1=906750&r2=906751&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ProducerState.cpp
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ProducerState.cpp
Fri Feb 5 00:45:07 2010
@@ -17,6 +17,8 @@
#include "ProducerState.h"
+#include <activemq/state/TransactionState.h>
+
using namespace activemq;
using namespace activemq::state;
using namespace activemq::commands;
@@ -39,3 +41,13 @@
return "NULL";
}
+
+////////////////////////////////////////////////////////////////////////////////
+void ProducerState::setTransactionState( const Pointer<TransactionState>&
transactionState ) {
+ this->transactionState = transactionState;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<TransactionState> ProducerState::getTransactionState() const {
+ return this->transactionState;
+}
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ProducerState.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ProducerState.h?rev=906751&r1=906750&r2=906751&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ProducerState.h
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ProducerState.h
Fri Feb 5 00:45:07 2010
@@ -32,10 +32,13 @@
using namespace decaf::lang;
using namespace activemq::commands;
+ class TransactionState;
+
class AMQCPP_API ProducerState {
private:
Pointer<ProducerInfo> info;
+ Pointer<TransactionState> transactionState;
public:
@@ -49,6 +52,10 @@
return this->info;
}
+ void setTransactionState( const Pointer<TransactionState>&
transactionState );
+
+ Pointer<TransactionState> getTransactionState() const;
+
};
}}
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/SessionState.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/SessionState.cpp?rev=906751&r1=906750&r2=906751&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/SessionState.cpp
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/SessionState.cpp
Fri Feb 5 00:45:07 2010
@@ -17,6 +17,8 @@
#include "SessionState.h"
+#include <activemq/state/TransactionState.h>
+
#include <decaf/lang/exceptions/IllegalStateException.h>
using namespace activemq;
@@ -52,3 +54,24 @@
__FILE__, __LINE__, "Session already Disposed" );
}
}
+
+////////////////////////////////////////////////////////////////////////////////
+void SessionState::addProducer( const Pointer<ProducerInfo>& info ) {
+ checkShutdown();
+ producers.put( info->getProducerId(),
+ Pointer<ProducerState>( new ProducerState( info ) ) );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<ProducerState> SessionState::removeProducer( const
Pointer<ProducerId>& id ) {
+
+ Pointer<ProducerState> producerState = producers.remove( id );
+ if( producerState != NULL ) {
+ if( producerState->getTransactionState() != NULL ) {
+ // allow the transaction to recreate dependent producer on recovery
+ producerState->getTransactionState()->addProducerState(
producerState );
+ }
+ }
+
+ return producerState;
+}
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/SessionState.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/SessionState.h?rev=906751&r1=906750&r2=906751&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/SessionState.h
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/SessionState.h
Fri Feb 5 00:45:07 2010
@@ -66,15 +66,9 @@
return this->info;
}
- void addProducer( const Pointer<ProducerInfo>& info ) {
- checkShutdown();
- producers.put( info->getProducerId(),
- Pointer<ProducerState>( new ProducerState( info ) ) );
- }
+ void addProducer( const Pointer<ProducerInfo>& info );
- Pointer<ProducerState> removeProducer( const Pointer<ProducerId>& id) {
- return producers.remove( id );
- }
+ Pointer<ProducerState> removeProducer( const Pointer<ProducerId>& id );
void addConsumer( const Pointer<ConsumerInfo>& info ) {
checkShutdown();
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/TransactionState.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/TransactionState.cpp?rev=906751&r1=906750&r2=906751&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/TransactionState.cpp
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/TransactionState.cpp
Fri Feb 5 00:45:07 2010
@@ -17,6 +17,8 @@
#include "TransactionState.h"
+#include <activemq/state/ProducerState.h>
+
#include <decaf/lang/exceptions/IllegalStateException.h>
using namespace activemq;
@@ -62,3 +64,16 @@
__FILE__, __LINE__, "Transaction already Disposed" );
}
}
+
+////////////////////////////////////////////////////////////////////////////////
+void TransactionState::addProducerState( const Pointer<ProducerState>&
producerState ) {
+
+ if( producerState != NULL ) {
+ producers.put( producerState->getInfo()->getProducerId(),
producerState );
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::vector< Pointer<ProducerState> > TransactionState::getProducerStates() {
+ return this->producers.values();
+}
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/TransactionState.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/TransactionState.h?rev=906751&r1=906750&r2=906751&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/TransactionState.h
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/TransactionState.h
Fri Feb 5 00:45:07 2010
@@ -20,11 +20,13 @@
#include <activemq/util/Config.h>
#include <activemq/commands/Command.h>
+#include <activemq/commands/ProducerId.h>
#include <activemq/commands/TransactionId.h>
#include <decaf/lang/Pointer.h>
#include <decaf/util/StlList.h>
#include <decaf/util/concurrent/atomic/AtomicBoolean.h>
+#include <decaf/util/concurrent/ConcurrentStlMap.h>
#include <string>
#include <memory>
@@ -35,8 +37,11 @@
using decaf::lang::Pointer;
using decaf::util::StlList;
using decaf::util::concurrent::atomic::AtomicBoolean;
+ using decaf::util::concurrent::ConcurrentStlMap;
using namespace activemq::commands;
+ class ProducerState;
+
class AMQCPP_API TransactionState {
private:
@@ -45,6 +50,8 @@
AtomicBoolean disposed;
bool prepared;
int preparedResult;
+ ConcurrentStlMap< Pointer<ProducerId>, Pointer<ProducerState>,
+ ProducerId::COMPARATOR > producers;
public:
@@ -86,6 +93,10 @@
return this->preparedResult;
}
+ void addProducerState( const Pointer<ProducerState>& producerState );
+
+ std::vector< Pointer<ProducerState> > getProducerStates();
+
};
}}
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp?rev=906751&r1=906750&r2=906751&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.cpp
Fri Feb 5 00:45:07 2010
@@ -51,14 +51,18 @@
this->useExponentialBackOff = true;
this->initialized = false;
this->maxReconnectAttempts = 0;
+ this->startupMaxReconnectAttempts = 0;
this->connectFailures = 0;
this->reconnectDelay = this->initialReconnectDelay;
this->trackMessages = false;
+ this->trackTransactionProducers = true;
this->maxCacheSize = 128 * 1024;
this->started = false;
this->closed = false;
this->connected = false;
+ this->connectionInterruptProcessingComplete = false;
+ this->firstConnection = true;
this->transportListener = NULL;
this->uris.reset( new URIPool() );
@@ -188,8 +192,8 @@
return;
}
- if( command->isRemoveInfo() ) {
- // Simulate response to RemoveInfo command
+ if( command->isRemoveInfo() || command->isMessageAck() ) {
+ // Simulate response to RemoveInfo command or Ack as they
will be stale.
stateTracker.track( command );
Pointer<Response> response( new Response() );
response->setCorrelationId( command->getCommandId() );
@@ -332,6 +336,7 @@
stateTracker.setMaxCacheSize( this->getMaxCacheSize() );
stateTracker.setTrackMessages( this->isTrackMessages() );
+ stateTracker.setTrackTransactionProducers(
this->isTrackTransactionProducers() );
if( connectedTransport != NULL ) {
stateTracker.restore( connectedTransport );
@@ -454,7 +459,6 @@
// Hand off to the close task so it gets done in a different thread.
closeTask->add( transport );
- taskRunner->wakeup();
synchronized( &reconnectMutex ) {
@@ -463,14 +467,19 @@
connectedTransportURI.reset( NULL );
connected = false;
+ // Notify before we attempt to reconnect so that the consumers
have a chance
+ // to cleanup their state.
+ if( transportListener != NULL ) {
+ transportListener->transportInterrupted();
+ }
+
+ // Place the State Tracker into a reconnection state.
+ this->stateTracker.transportInterrupted();
+
if( started ) {
taskRunner->wakeup();
}
}
-
- if( transportListener != NULL ) {
- transportListener->transportInterrupted();
- }
}
}
@@ -578,6 +587,10 @@
transport->setTransportListener(
disposedListener.get() );
}
+ try{
+ transport->stop();
+ } catch(...) {}
+
// Hand off to the close task so it gets done in a
different thread
// this prevents a deadlock from occurring if the
Transport happens
// to call back through our onException method or
locks in some other
@@ -616,6 +629,10 @@
transportListener->transportResumed();
}
+ if( firstConnection ) {
+ firstConnection = false;
+ }
+
//std::cout << "Failover: Successfully connected to Broker at:
"
// << connectedTransportURI->toString() << std::endl;
@@ -623,7 +640,17 @@
}
}
- if( maxReconnectAttempts > 0 && ++connectFailures >=
maxReconnectAttempts ) {
+ int reconnectAttempts = 0;
+ if( firstConnection ) {
+ if( startupMaxReconnectAttempts != 0 ) {
+ reconnectAttempts = startupMaxReconnectAttempts;
+ }
+ }
+ if( reconnectAttempts == 0 ) {
+ reconnectAttempts = maxReconnectAttempts;
+ }
+
+ if( reconnectAttempts > 0 && ++connectFailures >= reconnectAttempts ) {
connectionFailure = failure;
// Make sure on initial startup, that the transportListener has
been initialized
@@ -696,3 +723,11 @@
AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
AMQ_CATCHALL_THROW( IOException )
}
+
+////////////////////////////////////////////////////////////////////////////////
+void FailoverTransport::setConnectionInterruptProcessingComplete( const
Pointer<commands::ConnectionId>& connectionId ) {
+
+ synchronized( &reconnectMutex ) {
+ stateTracker.connectionInterruptProcessingComplete( this, connectionId
);
+ }
+}
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h?rev=906751&r1=906750&r2=906751&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransport.h
Fri Feb 5 00:45:07 2010
@@ -21,6 +21,7 @@
#include <activemq/util/Config.h>
#include <activemq/commands/Command.h>
+#include <activemq/commands/ConnectionId.h>
#include <activemq/threads/TaskRunner.h>
#include <activemq/threads/CompositeTaskRunner.h>
#include <activemq/state/ConnectionStateTracker.h>
@@ -67,10 +68,14 @@
bool useExponentialBackOff;
bool initialized;
int maxReconnectAttempts;
+ int startupMaxReconnectAttempts;
int connectFailures;
long long reconnectDelay;
bool trackMessages;
+ bool trackTransactionProducers;
int maxCacheSize;
+ bool connectionInterruptProcessingComplete;
+ bool firstConnection;
mutable decaf::util::concurrent::Mutex reconnectMutex;
mutable decaf::util::concurrent::Mutex sleepMutex;
@@ -368,6 +373,14 @@
this->maxReconnectAttempts = value;
}
+ int getStartupMaxReconnectAttempts() const {
+ return this->startupMaxReconnectAttempts;
+ }
+
+ void setStartupMaxReconnectAttempts( int value ) {
+ this->startupMaxReconnectAttempts = value;
+ }
+
long long getReconnectDelay() const {
return this->reconnectDelay;
}
@@ -400,6 +413,14 @@
this->trackMessages = value;
}
+ bool isTrackTransactionProducers() const {
+ return this->trackTransactionProducers;
+ }
+
+ void setTrackTransactionProducers( bool value ) {
+ this->trackTransactionProducers = value;
+ }
+
int getMaxCacheSize() const {
return this->maxCacheSize;
}
@@ -408,6 +429,8 @@
this->maxCacheSize = value;
}
+ void setConnectionInterruptProcessingComplete( const
Pointer<commands::ConnectionId>& connectionId );
+
protected:
/**
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.cpp?rev=906751&r1=906750&r2=906751&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.cpp
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/failover/FailoverTransportFactory.cpp
Fri Feb 5 00:45:07 2010
@@ -95,6 +95,8 @@
Boolean::parseBoolean( properties.getProperty(
"useExponentialBackOff", "true" ) ) );
transport->setMaxReconnectAttempts(
Integer::parseInt( properties.getProperty( "maxReconnectAttempts",
"0" ) ) );
+ transport->setStartupMaxReconnectAttempts(
+ Integer::parseInt( properties.getProperty(
"startupMaxReconnectAttempts", "0" ) ) );
transport->setRandomize(
Boolean::parseBoolean( properties.getProperty( "randomize", "true"
) ) );
transport->setBackup(