Author: tabish
Date: Wed Oct 20 20:46:54 2010
New Revision: 1025714
URL: http://svn.apache.org/viewvc?rev=1025714&view=rev
Log:
Refactored to internalize the member data and adds a failure state.
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp?rev=1025714&r1=1025713&r2=1025714&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
Wed Oct 20 20:46:54 2010
@@ -60,6 +60,38 @@ using namespace decaf::util::concurrent;
namespace activemq{
namespace core {
+ class ActiveMQConsumerMembers {
+ public:
+
+ cms::MessageListener* listener;
+ decaf::util::concurrent::Mutex listenerMutex;
+ AtomicBoolean deliveringAcks;
+ AtomicBoolean started;
+ Pointer<MessageDispatchChannel> unconsumedMessages;
+ decaf::util::StlQueue< decaf::lang::Pointer<commands::MessageDispatch>
> dispatchedMessages;
+ long long lastDeliveredSequenceId;
+ Pointer<commands::MessageAck> pendingAck;
+ int deliveredCounter;
+ int additionalWindowSize;
+ volatile bool synchronizationRegistered;
+ bool clearDispatchList;
+ bool inProgressClearRequiredFlag;
+ long long redeliveryDelay;
+ Pointer<RedeliveryPolicy> redeliveryPolicy;
+ Pointer<Exception> failureError;
+
+ ActiveMQConsumerMembers() : listener(NULL),
+ lastDeliveredSequenceId(0),
+ deliveredCounter(0),
+ additionalWindowSize(0),
+ synchronizationRegistered(false),
+ clearDispatchList(false),
+ inProgressClearRequiredFlag(false),
+ redeliveryDelay(0) {
+ }
+
+ };
+
/**
* Class used to deal with consumers in an active transaction. This
* class calls back into the consumer when the transaction is Committed or
@@ -223,6 +255,8 @@ ActiveMQConsumer::ActiveMQConsumer( Acti
"ActiveMQConsumer::ActiveMQConsumer - Destination given has no
Physical Name." );
}
+ this->internal = new ActiveMQConsumerMembers();
+
Pointer<ConsumerInfo> consumerInfo( new ConsumerInfo() );
consumerInfo->setConsumerId( id );
@@ -238,20 +272,20 @@ ActiveMQConsumer::ActiveMQConsumer( Acti
// Initialize Consumer Data
this->session = session;
this->consumerInfo = consumerInfo;
- this->lastDeliveredSequenceId = -1;
- this->synchronizationRegistered = false;
- this->additionalWindowSize = 0;
- this->deliveredCounter = 0;
- this->clearDispatchList = false;
- this->inProgressClearRequiredFlag = false;
- this->listener = NULL;
- this->redeliveryDelay = 0;
- this->redeliveryPolicy.reset(
this->session->getConnection()->getRedeliveryPolicy()->clone() );
+ this->internal->lastDeliveredSequenceId = -1;
+ this->internal->synchronizationRegistered = false;
+ this->internal->additionalWindowSize = 0;
+ this->internal->deliveredCounter = 0;
+ this->internal->clearDispatchList = false;
+ this->internal->inProgressClearRequiredFlag = false;
+ this->internal->listener = NULL;
+ this->internal->redeliveryDelay = 0;
+ this->internal->redeliveryPolicy.reset(
this->session->getConnection()->getRedeliveryPolicy()->clone() );
if( this->session->getConnection()->isMessagePrioritySupported() ) {
- this->unconsumedMessages.reset( new
SimplePriorityMessageDispatchChannel() );
+ this->internal->unconsumedMessages.reset( new
SimplePriorityMessageDispatchChannel() );
} else {
- this->unconsumedMessages.reset( new FifoMessageDispatchChannel() );
+ this->internal->unconsumedMessages.reset( new
FifoMessageDispatchChannel() );
}
if( listener != NULL ) {
@@ -263,7 +297,12 @@ ActiveMQConsumer::ActiveMQConsumer( Acti
ActiveMQConsumer::~ActiveMQConsumer() throw() {
try {
- close();
+
+ try{
+ this->close();
+ } catch(...) {}
+
+ delete this->internal;
}
AMQ_CATCH_NOTHROW( ActiveMQException )
AMQ_CATCHALL_NOTHROW( )
@@ -272,24 +311,24 @@ ActiveMQConsumer::~ActiveMQConsumer() th
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumer::start() {
- if( this->unconsumedMessages->isClosed() ) {
+ if( this->internal->unconsumedMessages->isClosed() ) {
return;
}
- this->started.set( true );
- this->unconsumedMessages->start();
+ this->internal->started.set( true );
+ this->internal->unconsumedMessages->start();
this->session->wakeup();
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumer::stop() {
- this->started.set( false );
- this->unconsumedMessages->stop();
+ this->internal->started.set( false );
+ this->internal->unconsumedMessages->stop();
}
////////////////////////////////////////////////////////////////////////////////
bool ActiveMQConsumer::isClosed() const {
- return this->unconsumedMessages->isClosed();
+ return this->internal->unconsumedMessages->isClosed();
}
////////////////////////////////////////////////////////////////////////////////
@@ -333,7 +372,7 @@ void ActiveMQConsumer::doClose() {
deliverAcks();
}
- this->started.set( false );
+ this->internal->started.set( false );
// Identifies any errors encountered during shutdown.
bool haveException = false;
@@ -341,7 +380,7 @@ void ActiveMQConsumer::doClose() {
// Purge all the pending messages
try{
- unconsumedMessages->clear();
+ internal->unconsumedMessages->clear();
} catch ( ActiveMQException& ex ){
if( !haveException ){
ex.setMark( __FILE__, __LINE__ );
@@ -351,30 +390,30 @@ void ActiveMQConsumer::doClose() {
}
// Stop and Wakeup all sync consumers.
- unconsumedMessages->close();
+ internal->unconsumedMessages->close();
if( this->session->isIndividualAcknowledge() ) {
// For IndividualAck Mode we need to unlink the ack handler to
remove a
// cyclic reference to the MessageDispatch that brought the
message to us.
- synchronized( &dispatchedMessages ) {
- std::auto_ptr< Iterator< Pointer<MessageDispatch> > >
iter( this->dispatchedMessages.iterator() );
+ synchronized( &internal->dispatchedMessages ) {
+ std::auto_ptr< Iterator< Pointer<MessageDispatch> > >
iter( this->internal->dispatchedMessages.iterator() );
while( iter->hasNext() ) {
iter->next()->getMessage()->setAckHandler(
Pointer<ActiveMQAckHandler>() );
}
- dispatchedMessages.clear();
+ this->internal->dispatchedMessages.clear();
}
}
// Remove this Consumer from the Connections set of Dispatchers
- this->session->removeConsumer(
this->consumerInfo->getConsumerId(), lastDeliveredSequenceId );
+ this->session->removeConsumer(
this->consumerInfo->getConsumerId(), this->internal->lastDeliveredSequenceId );
// Remove at the Broker Side, consumer has been removed from the
local
// Session and Connection objects so if the remote call to remove
throws
// it is okay to propagate to the client.
Pointer<RemoveInfo> info( new RemoveInfo );
info->setObjectId( this->consumerInfo->getConsumerId() );
- info->setLastDeliveredSequenceId( lastDeliveredSequenceId );
+ info->setLastDeliveredSequenceId(
this->internal->lastDeliveredSequenceId );
this->session->oneway( info );
// If we encountered an error, propagate it.
@@ -415,13 +454,17 @@ decaf::lang::Pointer<MessageDispatch> Ac
// Loop until the time is up or we get a non-expired message
while( true ) {
- Pointer<MessageDispatch> dispatch = unconsumedMessages->dequeue(
timeout );
+ Pointer<MessageDispatch> dispatch =
this->internal->unconsumedMessages->dequeue( timeout );
if( dispatch == NULL ) {
- if( timeout > 0 && !unconsumedMessages->isClosed() ) {
+ if( timeout > 0 &&
!this->internal->unconsumedMessages->isClosed() ) {
timeout = Math::max( deadline -
System::currentTimeMillis(), 0LL );
} else {
- return Pointer<MessageDispatch>();
+ if( this->internal->failureError != NULL ) {
+ throw
CMSExceptionSupport::create(*this->internal->failureError);
+ } else {
+ return Pointer<MessageDispatch>();
+ }
}
} else if( dispatch->getMessage() == NULL ) {
@@ -570,18 +613,18 @@ void ActiveMQConsumer::setMessageListene
session->stop();
}
- synchronized( &listenerMutex ) {
- this->listener = listener;
+ synchronized( &(this->internal->listenerMutex) ) {
+ this->internal->listener = listener;
}
- session->redispatch( *unconsumedMessages );
+ this->session->redispatch( *(this->internal->unconsumedMessages) );
if( wasStarted ) {
- session->start();
+ this->session->start();
}
} else {
- synchronized( &listenerMutex ) {
- this->listener = NULL;
+ synchronized( &(this->internal->listenerMutex) ) {
+ this->internal->listener = NULL;
}
}
}
@@ -601,14 +644,14 @@ void ActiveMQConsumer::beforeMessageIsCo
dispatch->getMessage()->setAckHandler( ackHandler );
}
- this->lastDeliveredSequenceId =
+ this->internal->lastDeliveredSequenceId =
dispatch->getMessage()->getMessageId()->getBrokerSequenceId();
if( !isAutoAcknowledgeBatch() ) {
// When not in an Auto
- synchronized( &dispatchedMessages ) {
- dispatchedMessages.enqueueFront( dispatch );
+ synchronized( &internal->dispatchedMessages ) {
+ internal->dispatchedMessages.enqueueFront( dispatch );
}
if( this->session->isTransacted() ) {
@@ -623,7 +666,7 @@ void ActiveMQConsumer::afterMessageIsCon
try{
- if( unconsumedMessages->isClosed() ) {
+ if( internal->unconsumedMessages->isClosed() ) {
return;
}
@@ -635,21 +678,21 @@ void ActiveMQConsumer::afterMessageIsCon
return;
} else if( isAutoAcknowledgeEach() ) {
- if( this->deliveringAcks.compareAndSet( false, true ) ) {
+ if( this->internal->deliveringAcks.compareAndSet( false, true ) ) {
- synchronized( &dispatchedMessages ) {
- if( !dispatchedMessages.empty() ) {
+ synchronized( &internal->dispatchedMessages ) {
+ if( !internal->dispatchedMessages.empty() ) {
Pointer<MessageAck> ack =
makeAckForAllDeliveredMessages(
ActiveMQConstants::ACK_TYPE_CONSUMED );
if( ack != NULL ) {
- dispatchedMessages.clear();
+ internal->dispatchedMessages.clear();
session->oneway( ack );
}
}
}
- this->deliveringAcks.set( false );
+ this->internal->deliveringAcks.set( false );
}
} else if( isAutoAcknowledgeBatch() ) {
@@ -659,8 +702,8 @@ void ActiveMQConsumer::afterMessageIsCon
bool messageUnackedByConsumer = false;
- synchronized( &dispatchedMessages ) {
- std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter(
this->dispatchedMessages.iterator() );
+ synchronized( &internal->dispatchedMessages ) {
+ std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter(
this->internal->dispatchedMessages.iterator() );
while( iter->hasNext() ) {
if( iter->next() == message ) {
messageUnackedByConsumer = true;
@@ -689,25 +732,25 @@ void ActiveMQConsumer::deliverAcks() {
Pointer<MessageAck> ack;
- if( this->deliveringAcks.compareAndSet( false, true ) ) {
+ if( this->internal->deliveringAcks.compareAndSet( false, true ) ) {
if( isAutoAcknowledgeEach() ) {
- synchronized( &dispatchedMessages ) {
+ synchronized( &internal->dispatchedMessages ) {
ack = makeAckForAllDeliveredMessages(
ActiveMQConstants::ACK_TYPE_CONSUMED );
if( ack != NULL ) {
- dispatchedMessages.clear();
+ internal->dispatchedMessages.clear();
} else {
- ack.swap( pendingAck );
+ ack.swap( internal->pendingAck );
}
}
- } else if( pendingAck != NULL &&
- pendingAck->getAckType() ==
ActiveMQConstants::ACK_TYPE_CONSUMED ) {
+ } else if( internal->pendingAck != NULL &&
+ internal->pendingAck->getAckType() ==
ActiveMQConstants::ACK_TYPE_CONSUMED ) {
- ack.swap( pendingAck );
+ ack.swap( internal->pendingAck );
}
if( ack != NULL ) {
@@ -717,7 +760,7 @@ void ActiveMQConsumer::deliverAcks() {
} catch(...) {}
} else {
- this->deliveringAcks.set( false );
+ this->internal->deliveringAcks.set( false );
}
}
}
@@ -733,8 +776,8 @@ void ActiveMQConsumer::ackLater( const P
// consumer got the message to expand the pre-fetch window
if( session->isTransacted() ) {
session->doStartTransaction();
- if( !synchronizationRegistered ) {
- synchronizationRegistered = true;
+ if( !internal->synchronizationRegistered ) {
+ internal->synchronizationRegistered = true;
Pointer<Synchronization> sync( new TransactionSynhcronization(
this ) );
this->session->getTransactionContext()->addSynchronization( sync );
@@ -743,20 +786,20 @@ void ActiveMQConsumer::ackLater( const P
// The delivered message list is only needed for the recover method
// which is only used with client ack.
- deliveredCounter++;
+ internal->deliveredCounter++;
- Pointer<MessageAck> oldPendingAck = pendingAck;
- pendingAck.reset( new MessageAck() );
- pendingAck->setConsumerId( dispatch->getConsumerId() );
- pendingAck->setAckType( (unsigned char)ackType );
- pendingAck->setDestination( dispatch->getDestination() );
- pendingAck->setLastMessageId( dispatch->getMessage()->getMessageId() );
- pendingAck->setMessageCount( deliveredCounter );
+ Pointer<MessageAck> oldPendingAck = internal->pendingAck;
+ internal->pendingAck.reset( new MessageAck() );
+ internal->pendingAck->setConsumerId( dispatch->getConsumerId() );
+ internal->pendingAck->setAckType( (unsigned char)ackType );
+ internal->pendingAck->setDestination( dispatch->getDestination() );
+ internal->pendingAck->setLastMessageId(
dispatch->getMessage()->getMessageId() );
+ internal->pendingAck->setMessageCount( internal->deliveredCounter );
if( oldPendingAck == NULL ) {
- pendingAck->setFirstMessageId( pendingAck->getLastMessageId() );
- } else if ( oldPendingAck->getAckType() == pendingAck->getAckType() ) {
- pendingAck->setFirstMessageId( oldPendingAck->getFirstMessageId() );
+ internal->pendingAck->setFirstMessageId(
internal->pendingAck->getLastMessageId() );
+ } else if ( oldPendingAck->getAckType() ==
internal->pendingAck->getAckType() ) {
+ internal->pendingAck->setFirstMessageId(
oldPendingAck->getFirstMessageId() );
} else {
// old pending ack being superseded by ack of another type, if is is
not a delivered
// ack and hence important, send it now so it is not lost.
@@ -766,33 +809,33 @@ void ActiveMQConsumer::ackLater( const P
}
if( session->isTransacted() ) {
- pendingAck->setTransactionId(
this->session->getTransactionContext()->getTransactionId() );
+ internal->pendingAck->setTransactionId(
this->session->getTransactionContext()->getTransactionId() );
}
- if( ( 0.5 * this->consumerInfo->getPrefetchSize() ) <= ( deliveredCounter
- additionalWindowSize ) ) {
- session->oneway( pendingAck );
- pendingAck.reset( NULL );
- deliveredCounter = 0;
- additionalWindowSize = 0;
+ if( ( 0.5 * this->consumerInfo->getPrefetchSize() ) <= (
internal->deliveredCounter - internal->additionalWindowSize ) ) {
+ session->oneway( this->internal->pendingAck );
+ internal->pendingAck.reset( NULL );
+ internal->deliveredCounter = 0;
+ internal->additionalWindowSize = 0;
}
}
////////////////////////////////////////////////////////////////////////////////
Pointer<MessageAck> ActiveMQConsumer::makeAckForAllDeliveredMessages( int type
) {
- synchronized( &dispatchedMessages ) {
+ synchronized( &internal->dispatchedMessages ) {
- if( !dispatchedMessages.empty() ) {
+ if( !internal->dispatchedMessages.empty() ) {
- Pointer<MessageDispatch> dispatched = dispatchedMessages.front();
+ Pointer<MessageDispatch> dispatched =
internal->dispatchedMessages.front();
Pointer<MessageAck> ack( new MessageAck() );
ack->setAckType( (unsigned char)type );
ack->setConsumerId( dispatched->getConsumerId() );
ack->setDestination( dispatched->getDestination() );
- ack->setMessageCount( (int)dispatchedMessages.size() );
+ ack->setMessageCount( (int)internal->dispatchedMessages.size() );
ack->setLastMessageId( dispatched->getMessage()->getMessageId() );
- ack->setFirstMessageId(
dispatchedMessages.back()->getMessage()->getMessageId() );
+ ack->setFirstMessageId(
internal->dispatchedMessages.back()->getMessage()->getMessageId() );
return ack;
}
@@ -820,8 +863,8 @@ void ActiveMQConsumer::acknowledge( cons
session->oneway( ack );
- synchronized( &dispatchedMessages ) {
- std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter(
this->dispatchedMessages.iterator() );
+ synchronized( &internal->dispatchedMessages ) {
+ std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter(
this->internal->dispatchedMessages.iterator() );
while( iter->hasNext() ) {
if( iter->next() == dispatch ) {
iter->remove();
@@ -844,7 +887,7 @@ void ActiveMQConsumer::acknowledge() {
try{
- synchronized( &dispatchedMessages ) {
+ synchronized( &internal->dispatchedMessages ) {
// Acknowledge all messages so far.
Pointer<MessageAck> ack =
@@ -860,14 +903,16 @@ void ActiveMQConsumer::acknowledge() {
}
session->oneway( ack );
- pendingAck.reset( NULL );
+ this->internal->pendingAck.reset( NULL );
// Adjust the counters
- deliveredCounter = Math::max( 0, deliveredCounter -
(int)dispatchedMessages.size());
- additionalWindowSize = Math::max(0, additionalWindowSize -
(int)dispatchedMessages.size());
+ this->internal->deliveredCounter =
+ Math::max( 0, internal->deliveredCounter -
(int)internal->dispatchedMessages.size());
+ this->internal->additionalWindowSize =
+ Math::max(0, internal->additionalWindowSize -
(int)internal->dispatchedMessages.size());
if( !session->isTransacted() ) {
- dispatchedMessages.clear();
+ this->internal->dispatchedMessages.clear();
}
}
}
@@ -877,43 +922,43 @@ void ActiveMQConsumer::acknowledge() {
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumer::commit() {
- synchronized( &dispatchedMessages ) {
- dispatchedMessages.clear();
+ synchronized( &(this->internal->dispatchedMessages) ) {
+ this->internal->dispatchedMessages.clear();
}
- redeliveryDelay = 0;
+ this->internal->redeliveryDelay = 0;
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumer::rollback() {
- synchronized( unconsumedMessages.get() ) {
+ synchronized( internal->unconsumedMessages.get() ) {
- synchronized( &dispatchedMessages ) {
- if( dispatchedMessages.empty() ) {
+ synchronized( &internal->dispatchedMessages ) {
+ if( internal->dispatchedMessages.empty() ) {
return;
}
// Only increase the redelivery delay after the first redelivery..
- Pointer<MessageDispatch> lastMsg = dispatchedMessages.front();
+ Pointer<MessageDispatch> lastMsg =
internal->dispatchedMessages.front();
const int currentRedeliveryCount =
lastMsg->getMessage()->getRedeliveryCounter();
if( currentRedeliveryCount > 0 ) {
- redeliveryDelay =
this->redeliveryPolicy->getNextRedeliveryDelay( redeliveryDelay );
+ internal->redeliveryDelay =
this->internal->redeliveryPolicy->getNextRedeliveryDelay(
internal->redeliveryDelay );
} else {
- redeliveryDelay =
this->redeliveryPolicy->getInitialRedeliveryDelay();
+ internal->redeliveryDelay =
this->internal->redeliveryPolicy->getInitialRedeliveryDelay();
}
Pointer<MessageId> firstMsgId =
- dispatchedMessages.back()->getMessage()->getMessageId();
+
internal->dispatchedMessages.back()->getMessage()->getMessageId();
- std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter(
dispatchedMessages.iterator() );
+ std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter(
internal->dispatchedMessages.iterator() );
while( iter->hasNext() ) {
Pointer<Message> message = iter->next()->getMessage();
message->setRedeliveryCounter( message->getRedeliveryCounter()
+ 1 );
}
- if( this->redeliveryPolicy->getMaximumRedeliveries() !=
RedeliveryPolicy::NO_MAXIMUM_REDELIVERIES &&
- lastMsg->getRedeliveryCounter() >
this->redeliveryPolicy->getMaximumRedeliveries() ) {
+ if( this->internal->redeliveryPolicy->getMaximumRedeliveries() !=
RedeliveryPolicy::NO_MAXIMUM_REDELIVERIES &&
+ lastMsg->getRedeliveryCounter() >
this->internal->redeliveryPolicy->getMaximumRedeliveries() ) {
// We need to NACK the messages so that they get sent to the
DLQ.
// Acknowledge the last message.
@@ -921,15 +966,15 @@ void ActiveMQConsumer::rollback() {
ack->setAckType( ActiveMQConstants::ACK_TYPE_POISON );
ack->setConsumerId( this->consumerInfo->getConsumerId() );
ack->setDestination( lastMsg->getDestination() );
- ack->setMessageCount( (int)dispatchedMessages.size() );
+ ack->setMessageCount( (int)internal->dispatchedMessages.size()
);
ack->setLastMessageId( lastMsg->getMessage()->getMessageId() );
ack->setFirstMessageId( firstMsgId );
session->oneway( ack );
// Adjust the window size.
- additionalWindowSize =
- Math::max( 0, additionalWindowSize -
(int)dispatchedMessages.size() );
- redeliveryDelay = 0;
+ internal->additionalWindowSize =
+ Math::max( 0, internal->additionalWindowSize -
(int)internal->dispatchedMessages.size() );
+ internal->redeliveryDelay = 0;
} else {
@@ -939,7 +984,7 @@ void ActiveMQConsumer::rollback() {
ack->setAckType( ActiveMQConstants::ACK_TYPE_REDELIVERED );
ack->setConsumerId( this->consumerInfo->getConsumerId() );
ack->setDestination( lastMsg->getDestination() );
- ack->setMessageCount( (int)dispatchedMessages.size() );
+ ack->setMessageCount(
(int)internal->dispatchedMessages.size() );
ack->setLastMessageId(
lastMsg->getMessage()->getMessageId() );
ack->setFirstMessageId( firstMsgId );
@@ -947,15 +992,15 @@ void ActiveMQConsumer::rollback() {
}
// stop the delivery of messages.
- unconsumedMessages->stop();
+ internal->unconsumedMessages->stop();
- std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter(
dispatchedMessages.iterator() );
+ std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter(
internal->dispatchedMessages.iterator() );
while( iter->hasNext() ) {
- unconsumedMessages->enqueueFirst( iter->next() );
+ internal->unconsumedMessages->enqueueFirst( iter->next() );
}
- if (redeliveryDelay > 0 && !unconsumedMessages->isClosed()) {
+ if( internal->redeliveryDelay > 0 &&
!internal->unconsumedMessages->isClosed() ) {
// TODO
// Start up the delivery again a little later.
//scheduler.executeAfterDelay(new Runnable() {
@@ -975,13 +1020,13 @@ void ActiveMQConsumer::rollback() {
}
}
- deliveredCounter -= (int)dispatchedMessages.size();
- dispatchedMessages.clear();
+ internal->deliveredCounter -=
(int)internal->dispatchedMessages.size();
+ internal->dispatchedMessages.clear();
}
}
- if( this->listener != NULL ) {
- session->redispatch( *unconsumedMessages );
+ if( this->internal->listener != NULL ) {
+ session->redispatch( *internal->unconsumedMessages );
}
}
@@ -990,17 +1035,17 @@ void ActiveMQConsumer::dispatch( const P
try {
- synchronized( unconsumedMessages.get() ) {
+ synchronized( internal->unconsumedMessages.get() ) {
clearMessagesInProgress();
- if( this->clearDispatchList ) {
+ if( this->internal->clearDispatchList ) {
// we are reconnecting so lets flush the in progress
// messages
- clearDispatchList = false;
- unconsumedMessages->clear();
+ internal->clearDispatchList = false;
+ internal->unconsumedMessages->clear();
}
- if( !unconsumedMessages->isClosed() ) {
+ if( !internal->unconsumedMessages->isClosed() ) {
// Don't dispatch expired messages, ack it and then destroy it
if( dispatch->getMessage()->isExpired() ) {
@@ -1010,15 +1055,15 @@ void ActiveMQConsumer::dispatch( const P
return;
}
- synchronized( &listenerMutex ) {
+ synchronized( &internal->listenerMutex ) {
// If we have a listener, send the message.
- if( this->listener != NULL &&
unconsumedMessages->isRunning() ) {
+ if( this->internal->listener != NULL &&
internal->unconsumedMessages->isRunning() ) {
// Preprocessing.
beforeMessageIsConsumed( dispatch );
// Notify the listener
- this->listener->onMessage(
+ this->internal->listener->onMessage(
dynamic_cast<cms::Message*>(
dispatch->getMessage().get() ) );
// Postprocessing
@@ -1027,7 +1072,7 @@ void ActiveMQConsumer::dispatch( const P
} else {
// No listener, add it to the unconsumed messages list
- this->unconsumedMessages->enqueue( dispatch );
+ this->internal->unconsumedMessages->enqueue( dispatch
);
}
}
}
@@ -1046,7 +1091,7 @@ void ActiveMQConsumer::sendPullRequest(
this->checkClosed();
// There are still local message, consume them first.
- if( !this->unconsumedMessages->isEmpty() ) {
+ if( !this->internal->unconsumedMessages->isEmpty() ) {
return;
}
@@ -1077,16 +1122,16 @@ void ActiveMQConsumer::checkClosed() con
////////////////////////////////////////////////////////////////////////////////
bool ActiveMQConsumer::iterate() {
- synchronized( &listenerMutex ) {
+ synchronized( &internal->listenerMutex ) {
- if( this->listener != NULL ) {
+ if( this->internal->listener != NULL ) {
- Pointer<MessageDispatch> dispatch =
unconsumedMessages->dequeueNoWait();
+ Pointer<MessageDispatch> dispatch =
internal->unconsumedMessages->dequeueNoWait();
if( dispatch != NULL ) {
try {
beforeMessageIsConsumed( dispatch );
- this->listener->onMessage(
+ this->internal->listener->onMessage(
dynamic_cast<cms::Message*>(
dispatch->getMessage().get() ) );
afterMessageIsConsumed( dispatch, false );
} catch( ActiveMQException& ex ) {
@@ -1104,22 +1149,22 @@ bool ActiveMQConsumer::iterate() {
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumer::inProgressClearRequired() {
- inProgressClearRequiredFlag = true;
+ this->internal->inProgressClearRequiredFlag = true;
// Clears dispatched messages async to avoid lock contention with
inprogress acks.
- clearDispatchList = true;
+ this->internal->clearDispatchList = true;
}
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumer::clearMessagesInProgress() {
- if( inProgressClearRequiredFlag ) {
- synchronized( unconsumedMessages.get() ) {
- if( inProgressClearRequiredFlag ) {
+ if( this->internal->inProgressClearRequiredFlag ) {
+ synchronized( this->internal->unconsumedMessages.get() ) {
+ if( this->internal->inProgressClearRequiredFlag ) {
// TODO - Rollback duplicates.
// allow dispatch on this connection to resume
this->session->getConnection()->setTransportInterruptionProcessingComplete();
- inProgressClearRequiredFlag = false;
+ this->internal->inProgressClearRequiredFlag = false;
}
}
}
@@ -1138,7 +1183,7 @@ bool ActiveMQConsumer::isAutoAcknowledge
////////////////////////////////////////////////////////////////////////////////
int ActiveMQConsumer::getMessageAvailableCount() const {
- return this->unconsumedMessages->size();
+ return this->internal->unconsumedMessages->size();
}
////////////////////////////////////////////////////////////////////////////////
@@ -1219,11 +1264,64 @@ void ActiveMQConsumer::applyDestinationO
////////////////////////////////////////////////////////////////////////////////
void ActiveMQConsumer::setRedeliveryPolicy( RedeliveryPolicy* policy ) {
if( policy != NULL ) {
- this->redeliveryPolicy.reset( policy );
+ this->internal->redeliveryPolicy.reset( policy );
}
}
////////////////////////////////////////////////////////////////////////////////
RedeliveryPolicy* ActiveMQConsumer::getRedeliveryPolicy() const {
- return this->redeliveryPolicy.get();
+ return this->internal->redeliveryPolicy.get();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::MessageListener* ActiveMQConsumer::getMessageListener() const {
+ return this->internal->listener;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+const Pointer<commands::ConsumerInfo>& ActiveMQConsumer::getConsumerInfo()
const {
+ this->checkClosed();
+ return this->consumerInfo;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+const Pointer<commands::ConsumerId>& ActiveMQConsumer::getConsumerId() const {
+ this->checkClosed();
+ return this->consumerInfo->getConsumerId();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConsumer::isSynchronizationRegistered() const {
+ return this->internal->synchronizationRegistered;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumer::setSynchronizationRegistered( bool value ) {
+ this->internal->synchronizationRegistered = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long ActiveMQConsumer::getLastDeliveredSequenceId() const {
+ return this->internal->lastDeliveredSequenceId;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumer::setLastDeliveredSequenceId( long long value ) {
+ this->internal->lastDeliveredSequenceId = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumer::setFailureError( decaf::lang::Exception* error ) {
+ if( error != NULL ) {
+ this->internal->failureError.reset( error->clone() );
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+decaf::lang::Exception* ActiveMQConsumer::getFailureError() const {
+ if( this->internal->failureError == NULL ) {
+ return NULL;
+ }
+
+ return this->internal->failureError.get();
}
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
URL:
http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h?rev=1025714&r1=1025713&r2=1025714&view=diff
==============================================================================
---
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
(original)
+++
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
Wed Oct 20 20:46:54 2010
@@ -43,6 +43,7 @@ namespace core{
using decaf::util::concurrent::atomic::AtomicBoolean;
class ActiveMQSession;
+ class ActiveMQConsumerMembers;
class AMQCPP_API ActiveMQConsumer : public cms::MessageConsumer,
public Dispatcher
@@ -50,89 +51,19 @@ namespace core{
private:
/**
- * The session that owns this Consumer
+ * Internal Class that holds Members of this class, allows for changes
without API breakage.
*/
- ActiveMQSession* session;
-
- /**
- * The Consumer info for this Consumer
- */
- Pointer<commands::ConsumerInfo> consumerInfo;
-
- /**
- * The Message Listener for this Consumer
- */
- cms::MessageListener* listener;
-
- /**
- * Mutex to Protect access to the listener during delivery.
- */
- decaf::util::concurrent::Mutex listenerMutex;
-
- /**
- * Is the consumer currently delivering acks.
- */
- AtomicBoolean deliveringAcks;
-
- /**
- * Has this Consumer been started yet.
- */
- AtomicBoolean started;
-
- /**
- * Queue of unconsumed messages.
- */
- Pointer<MessageDispatchChannel> unconsumedMessages;
-
- /**
- * Queue of consumed messages.
- */
- decaf::util::StlQueue< decaf::lang::Pointer<commands::MessageDispatch>
> dispatchedMessages;
-
- /**
- * The last delivered message's BrokerSequenceId.
- */
- long long lastDeliveredSequenceId;
-
- /**
- * Next Ack to go out.
- */
- Pointer<commands::MessageAck> pendingAck;
-
- /**
- * How many message's have been delivered so far since the last Ack
was sent.
- */
- int deliveredCounter;
-
- /**
- * How big to grow the ack window next time.
- */
- int additionalWindowSize;
-
- /**
- * Has the Synchronization been added for this transaction
- */
- volatile bool synchronizationRegistered;
-
- /**
- * Boolean indicating if in progress messages should be cleared.
- */
- bool clearDispatchList;
+ ActiveMQConsumerMembers* internal;
/**
- * Indicates if inprogress messages are to be cleared.
+ * The ActiveMQSession that owns this class instance.
*/
- bool inProgressClearRequiredFlag;
-
- /**
- * The redelivery delay used for the last set of redeliveries.
- */
- long long redeliveryDelay;
+ ActiveMQSession* session;
/**
- * The policy to use when Message Redelivery is in progress.
+ * The ConsumerInfo object for this class instance.
*/
- Pointer<RedeliveryPolicy> redeliveryPolicy;
+ Pointer<commands::ConsumerInfo> consumerInfo;
private:
@@ -174,9 +105,7 @@ namespace core{
virtual void setMessageListener( cms::MessageListener* listener );
- virtual cms::MessageListener* getMessageListener() const {
- return this->listener;
- }
+ virtual cms::MessageListener* getMessageListener() const;
virtual std::string getMessageSelector() const;
@@ -220,19 +149,13 @@ namespace core{
* Get the Consumer information for this consumer
* @return Reference to a Consumer Info Object
*/
- const Pointer<commands::ConsumerInfo>& getConsumerInfo() const {
- this->checkClosed();
- return this->consumerInfo;
- }
+ const Pointer<commands::ConsumerInfo>& getConsumerInfo() const;
/**
* Get the Consumer Id for this consumer
* @return Reference to a Consumer Id Object
*/
- const Pointer<commands::ConsumerId>& getConsumerId() const {
- this->checkClosed();
- return this->consumerInfo->getConsumerId();
- }
+ const Pointer<commands::ConsumerId>& getConsumerId() const;
/**
* @returns if this Consumer has been closed.
@@ -243,17 +166,13 @@ namespace core{
* Has this Consumer Transaction Synchronization been added to the
transaction
* @return true if the synchronization has been added.
*/
- bool isSynchronizationRegistered() const {
- return this->synchronizationRegistered;
- }
+ bool isSynchronizationRegistered() const ;
/**
* Sets the Synchronization Registered state of this consumer.
* @param value - true if registered false otherwise.
*/
- void setSynchronizationRegistered( bool value ) {
- this->synchronizationRegistered = value;
- }
+ void setSynchronizationRegistered( bool value );
/**
* Deliver any pending messages to the registered MessageListener if
there
@@ -285,9 +204,7 @@ namespace core{
*
* @returns long long containing the sequence id of the last delivered
Message.
*/
- long long getLastDeliveredSequenceId() const {
- return this->lastDeliveredSequenceId;
- }
+ long long getLastDeliveredSequenceId() const;
/**
* Sets the value of the Last Delivered Sequence Id
@@ -295,9 +212,7 @@ namespace core{
* @param value
* The new value to assign to the Last Delivered Sequence Id
property.
*/
- void setLastDeliveredSequenceId( long long value ) {
- this->lastDeliveredSequenceId = value;
- }
+ void setLastDeliveredSequenceId( long long value );
/**
* @returns the number of Message's this consumer is waiting to
Dispatch.
@@ -323,6 +238,22 @@ namespace core{
*/
RedeliveryPolicy* getRedeliveryPolicy() const;
+ /**
+ * Sets the Exception that has caused this Consumer to be in a failed
state.
+ *
+ * @param error
+ * The error that is to be thrown when a Receive call is made.
+ */
+ void setFailureError( decaf::lang::Exception* error );
+
+ /**
+ * Gets the error that caused this Consumer to be in a Failed state,
or NULL if
+ * there is no Error.
+ *
+ * @returns pointer to the error that faulted this Consumer or NULL.
+ */
+ decaf::lang::Exception* getFailureError() const;
+
protected:
/**