Author: tabish Date: Mon Aug 13 18:30:02 2012 New Revision: 1372538 URL: http://svn.apache.org/viewvc?rev=1372538&view=rev Log: Switch to LinkedList and read / write lock for better stability.
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=1372538&r1=1372537&r2=1372538&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp Mon Aug 13 18:30:02 2012 @@ -163,7 +163,8 @@ namespace core{ DispatcherMap dispatchers; ProducerMap activeProducers; - decaf::util::concurrent::CopyOnWriteArrayList< Pointer<ActiveMQSessionKernel> > activeSessions; + decaf::util::concurrent::locks::ReentrantReadWriteLock sessionsLock; + decaf::util::LinkedList< Pointer<ActiveMQSessionKernel> > activeSessions; decaf::util::LinkedList<transport::TransportListener*> transportListeners; TempDestinationMap activeTempDestinations; @@ -205,6 +206,7 @@ namespace core{ firstFailureError(), dispatchers(), activeProducers(), + sessionsLock(), activeSessions(), transportListeners(), activeTempDestinations() { @@ -430,8 +432,13 @@ Pointer<SessionId> ActiveMQConnection::g //////////////////////////////////////////////////////////////////////////////// void ActiveMQConnection::addSession(Pointer<ActiveMQSessionKernel> session) { try { - synchronized(&this->config->activeSessions) { + this->config->sessionsLock.writeLock().lock(); + try { this->config->activeSessions.add(session); + this->config->sessionsLock.writeLock().unlock(); + } catch (Exception& ex) { + this->config->sessionsLock.writeLock().unlock(); + throw; } } AMQ_CATCH_ALL_THROW_CMSEXCEPTION() @@ -440,8 +447,13 @@ void ActiveMQConnection::addSession(Poin //////////////////////////////////////////////////////////////////////////////// void ActiveMQConnection::removeSession(Pointer<ActiveMQSessionKernel> session) { try { - synchronized(&this->config->activeSessions) { + this->config->sessionsLock.writeLock().lock(); + try { this->config->activeSessions.remove(session); + this->config->sessionsLock.writeLock().unlock(); + } catch (Exception& ex) { + this->config->sessionsLock.writeLock().unlock(); + throw; } } AMQ_CATCH_ALL_THROW_CMSEXCEPTION() @@ -629,17 +641,24 @@ void ActiveMQConnection::cleanup() { try { - // Get the complete list of active sessions. - std::auto_ptr< Iterator< Pointer<ActiveMQSessionKernel> > > iter( this->config->activeSessions.iterator() ); + this->config->sessionsLock.readLock().lock(); + try { + // Get the complete list of active sessions. + std::auto_ptr< Iterator< Pointer<ActiveMQSessionKernel> > > iter( this->config->activeSessions.iterator() ); - // Dispose of all the Session resources we know are still open. - while (iter->hasNext()) { - Pointer<ActiveMQSessionKernel> session = iter->next(); - try{ - session->dispose(); - } catch( cms::CMSException& ex ){ - /* Absorb */ + // Dispose of all the Session resources we know are still open. + while (iter->hasNext()) { + Pointer<ActiveMQSessionKernel> session = iter->next(); + try{ + session->dispose(); + } catch( cms::CMSException& ex ){ + /* Absorb */ + } } + this->config->sessionsLock.readLock().unlock(); + } catch (Exception& ex) { + this->config->sessionsLock.readLock().unlock(); + throw; } if (this->config->isConnectionInfoSentToBroker) { @@ -668,19 +687,24 @@ void ActiveMQConnection::start() { checkClosedOrFailed(); ensureConnectionInfoSent(); - // This starts or restarts the delivery of all incoming messages - // messages delivered while this connection is stopped are dropped - // and not acknowledged. - if (this->started.compareAndSet(false, true)) { - - synchronized(&this->config->activeSessions) { + try { + // This starts or restarts the delivery of all incoming messages + // messages delivered while this connection is stopped are dropped + // and not acknowledged. + if (this->started.compareAndSet(false, true)) { + this->config->sessionsLock.readLock().lock(); // Start all the sessions. std::auto_ptr<Iterator< Pointer<ActiveMQSessionKernel> > > iter(this->config->activeSessions.iterator()); while (iter->hasNext()) { iter->next()->start(); } + + this->config->sessionsLock.readLock().unlock(); } + } catch (Exception& ex) { + this->config->sessionsLock.readLock().unlock(); + throw; } } AMQ_CATCH_ALL_THROW_CMSEXCEPTION() @@ -693,16 +717,21 @@ void ActiveMQConnection::stop() { checkClosedOrFailed(); - // Once current deliveries are done this stops the delivery of any - // new messages. - if (this->started.compareAndSet(true, false)) { - synchronized(&this->config->activeSessions) { + try { + // Once current deliveries are done this stops the delivery of any + // new messages. + if (this->started.compareAndSet(true, false)) { + this->config->sessionsLock.readLock().lock(); std::auto_ptr<Iterator< Pointer<ActiveMQSessionKernel> > > iter(this->config->activeSessions.iterator()); while (iter->hasNext()) { iter->next()->stop(); } + this->config->sessionsLock.readLock().unlock(); } + } catch (Exception& ex) { + this->config->sessionsLock.readLock().unlock(); + throw; } } AMQ_CATCH_ALL_THROW_CMSEXCEPTION() @@ -942,16 +971,23 @@ void ActiveMQConnection::onConsumerContr Pointer<ConsumerControl> consumerControl = command.dynamicCast<ConsumerControl>(); - // Get the complete list of active sessions. - std::auto_ptr< Iterator< Pointer<ActiveMQSessionKernel> > > iter( this->config->activeSessions.iterator() ); + this->config->sessionsLock.readLock().lock(); + try { + // Get the complete list of active sessions. + std::auto_ptr< Iterator< Pointer<ActiveMQSessionKernel> > > iter( this->config->activeSessions.iterator() ); - while (iter->hasNext()) { - Pointer<ActiveMQSessionKernel> session = iter->next(); - if (consumerControl->isClose()) { - session->close(consumerControl->getConsumerId()); - } else { - session->setPrefetchSize(consumerControl->getConsumerId(), consumerControl->getPrefetch()); + while (iter->hasNext()) { + Pointer<ActiveMQSessionKernel> session = iter->next(); + if (consumerControl->isClose()) { + session->close(consumerControl->getConsumerId()); + } else { + session->setPrefetchSize(consumerControl->getConsumerId(), consumerControl->getPrefetch()); + } } + this->config->sessionsLock.readLock().unlock(); + } catch (Exception& ex) { + this->config->sessionsLock.readLock().unlock(); + throw; } } @@ -1001,11 +1037,16 @@ void ActiveMQConnection::transportInterr this->config->transportInterruptionProcessingComplete.reset( new CountDownLatch( (int)this->config->dispatchers.size() ) ); - synchronized(&this->config->activeSessions) { + this->config->sessionsLock.readLock().lock(); + try { std::auto_ptr< Iterator< Pointer<ActiveMQSessionKernel> > > sessions(this->config->activeSessions.iterator()); while (sessions->hasNext()) { sessions->next()->clearMessagesInProgress(); } + this->config->sessionsLock.readLock().unlock(); + } catch (Exception& ex) { + this->config->sessionsLock.readLock().unlock(); + throw; } synchronized(&this->config->transportListeners) { @@ -1490,12 +1531,20 @@ void ActiveMQConnection::deleteTempDesti checkClosedOrFailed(); ensureConnectionInfoSent(); - Pointer< Iterator< Pointer<ActiveMQSessionKernel> > > iterator(this->config->activeSessions.iterator()); - while (iterator->hasNext()) { - Pointer<ActiveMQSessionKernel> session = iterator->next(); - if (session->isInUse(destination)) { - throw ActiveMQException(__FILE__, __LINE__, "A consumer is consuming from the temporary destination"); + this->config->sessionsLock.readLock().lock(); + try { + Pointer< Iterator< Pointer<ActiveMQSessionKernel> > > iterator(this->config->activeSessions.iterator()); + while (iterator->hasNext()) { + Pointer<ActiveMQSessionKernel> session = iterator->next(); + if (session->isInUse(destination)) { + this->config->sessionsLock.readLock().unlock(); + throw ActiveMQException(__FILE__, __LINE__, "A consumer is consuming from the temporary destination"); + } } + this->config->sessionsLock.readLock().unlock(); + } catch (Exception& ex) { + this->config->sessionsLock.readLock().unlock(); + throw; } this->config->activeTempDestinations.remove(destination); Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp?rev=1372538&r1=1372537&r2=1372538&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp Mon Aug 13 18:30:02 2012 @@ -56,9 +56,10 @@ #include <decaf/lang/Long.h> #include <decaf/lang/Math.h> #include <decaf/util/Queue.h> +#include <decaf/util/LinkedList.h> #include <decaf/util/concurrent/Mutex.h> -#include <decaf/util/concurrent/CopyOnWriteArrayList.h> #include <decaf/util/concurrent/atomic/AtomicBoolean.h> +#include <decaf/util/concurrent/locks/ReentrantReadWriteLock.h> #include <decaf/lang/exceptions/InvalidStateException.h> #include <decaf/lang/exceptions/NullPointerException.h> @@ -92,8 +93,10 @@ namespace kernels{ public: AtomicBoolean synchronizationRegistered; - decaf::util::concurrent::CopyOnWriteArrayList< Pointer<ActiveMQProducerKernel> > producers; - decaf::util::concurrent::CopyOnWriteArrayList< Pointer<ActiveMQConsumerKernel> > consumers; + decaf::util::concurrent::locks::ReentrantReadWriteLock producerLock; + decaf::util::LinkedList< Pointer<ActiveMQProducerKernel> > producers; + decaf::util::concurrent::locks::ReentrantReadWriteLock consumerLock; + decaf::util::LinkedList< Pointer<ActiveMQConsumerKernel> > consumers; Pointer<Scheduler> scheduler; Pointer<CloseSynhcronization> closeSync; Mutex sendMutex; @@ -102,8 +105,8 @@ namespace kernels{ public: SessionConfig() : synchronizationRegistered(false), - producers(), consumers(), scheduler(), closeSync(), - sendMutex(), transformer(NULL) {} + producerLock(), producers(), consumerLock(), consumers(), + scheduler(), closeSync(), sendMutex(), transformer(NULL) {} ~SessionConfig() {} }; @@ -339,31 +342,45 @@ void ActiveMQSessionKernel::dispose() { } // Dispose of all Consumers, the dispose method skips the RemoveInfo command. - Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > consumerIter(this->config->consumers.iterator()); - while (consumerIter->hasNext()) { - try{ - Pointer<ActiveMQConsumerKernel> consumer = consumerIter->next(); - consumer->setFailureError(this->connection->getFirstFailureError()); - consumer->dispose(); - this->lastDeliveredSequenceId = - Math::max(this->lastDeliveredSequenceId, consumer->getLastDeliveredSequenceId()); - } catch (cms::CMSException& ex) { - /* Absorb */ + this->config->consumerLock.writeLock().lock(); + try { + Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > consumerIter(this->config->consumers.iterator()); + while (consumerIter->hasNext()) { + try{ + Pointer<ActiveMQConsumerKernel> consumer = consumerIter->next(); + consumer->setFailureError(this->connection->getFirstFailureError()); + consumer->dispose(); + this->lastDeliveredSequenceId = + Math::max(this->lastDeliveredSequenceId, consumer->getLastDeliveredSequenceId()); + } catch (cms::CMSException& ex) { + /* Absorb */ + } } + this->config->consumers.clear(); + this->config->consumerLock.writeLock().unlock(); + } catch (Exception& ex) { + this->config->consumerLock.writeLock().unlock(); + throw; } - this->config->consumers.clear(); - // Dispose of all Producers, the dispose method skips the RemoveInfo command. - std::auto_ptr<Iterator<Pointer<ActiveMQProducerKernel> > > producerIter(this->config->producers.iterator()); + this->config->producerLock.writeLock().lock(); + try { + // Dispose of all Producers, the dispose method skips the RemoveInfo command. + std::auto_ptr<Iterator<Pointer<ActiveMQProducerKernel> > > producerIter(this->config->producers.iterator()); - while (producerIter->hasNext()) { - try{ - producerIter->next()->dispose(); - } catch (cms::CMSException& ex) { - /* Absorb */ + while (producerIter->hasNext()) { + try{ + producerIter->next()->dispose(); + } catch (cms::CMSException& ex) { + /* Absorb */ + } } + this->config->producers.clear(); + this->config->producerLock.writeLock().unlock(); + } catch (Exception& ex) { + this->config->producerLock.writeLock().unlock(); + throw; } - this->config->producers.clear(); } AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException ) AMQ_CATCH_EXCEPTION_CONVERT( Exception, activemq::exceptions::ActiveMQException ) @@ -417,10 +434,17 @@ void ActiveMQSessionKernel::recover() { throw cms::IllegalStateException("This session is transacted"); } - Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator()); - while (iter->hasNext()) { - Pointer<ActiveMQConsumerKernel> consumer = iter->next(); - consumer->rollback(); + this->config->consumerLock.readLock().lock(); + try { + Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator()); + while (iter->hasNext()) { + Pointer<ActiveMQConsumerKernel> consumer = iter->next(); + consumer->rollback(); + } + this->config->consumerLock.readLock().unlock(); + } catch (Exception& ex) { + this->config->consumerLock.readLock().unlock(); + throw; } } AMQ_CATCH_ALL_THROW_CMSEXCEPTION() @@ -433,32 +457,53 @@ void ActiveMQSessionKernel::clearMessage this->executor->clearMessagesInProgress(); } - Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator()); - while (iter->hasNext()) { - Pointer<ActiveMQConsumerKernel> consumer = iter->next(); - consumer->inProgressClearRequired(); - this->connection->getScheduler()->executeAfterDelay( - new ClearConsumerTask(consumer), 0LL); + this->config->consumerLock.readLock().lock(); + try { + Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator()); + while (iter->hasNext()) { + Pointer<ActiveMQConsumerKernel> consumer = iter->next(); + consumer->inProgressClearRequired(); + this->connection->getScheduler()->executeAfterDelay( + new ClearConsumerTask(consumer), 0LL); + } + this->config->consumerLock.readLock().unlock(); + } catch (Exception& ex) { + this->config->consumerLock.readLock().unlock(); + throw; } } //////////////////////////////////////////////////////////////////////////////// void ActiveMQSessionKernel::acknowledge() { - Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator()); - while (iter->hasNext()) { - Pointer<ActiveMQConsumerKernel> consumer = iter->next(); - consumer->acknowledge(); + this->config->consumerLock.readLock().lock(); + try { + Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator()); + while (iter->hasNext()) { + Pointer<ActiveMQConsumerKernel> consumer = iter->next(); + consumer->acknowledge(); + } + this->config->consumerLock.readLock().unlock(); + } catch (Exception& ex) { + this->config->consumerLock.readLock().unlock(); + throw; } } //////////////////////////////////////////////////////////////////////////////// void ActiveMQSessionKernel::deliverAcks() { - Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator()); - while (iter->hasNext()) { - Pointer<ActiveMQConsumerKernel> consumer = iter->next(); - consumer->deliverAcks(); + this->config->consumerLock.readLock().lock(); + try { + Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator()); + while (iter->hasNext()) { + Pointer<ActiveMQConsumerKernel> consumer = iter->next(); + consumer->deliverAcks(); + } + this->config->consumerLock.readLock().unlock(); + } catch (Exception& ex) { + this->config->consumerLock.readLock().unlock(); + throw; } } @@ -988,11 +1033,18 @@ void ActiveMQSessionKernel::redispatch(M //////////////////////////////////////////////////////////////////////////////// void ActiveMQSessionKernel::start() { - Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator()); + this->config->consumerLock.readLock().lock(); + try { + Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator()); - while (iter->hasNext()) { - Pointer<ActiveMQConsumerKernel> consumer = iter->next(); - consumer->start(); + while (iter->hasNext()) { + Pointer<ActiveMQConsumerKernel> consumer = iter->next(); + consumer->start(); + } + this->config->consumerLock.readLock().unlock(); + } catch (Exception& ex) { + this->config->consumerLock.readLock().unlock(); + throw; } if (this->executor.get() != NULL) { @@ -1043,13 +1095,21 @@ void ActiveMQSessionKernel::createTempor //////////////////////////////////////////////////////////////////////////////// bool ActiveMQSessionKernel::isInUse(Pointer<ActiveMQDestination> destination) { - Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator()); + this->config->consumerLock.readLock().lock(); + try { + Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator()); - while (iter->hasNext()) { - Pointer<ActiveMQConsumerKernel> consumer = iter->next(); - if (consumer->isInUse(destination)) { - return true; + while (iter->hasNext()) { + Pointer<ActiveMQConsumerKernel> consumer = iter->next(); + if (consumer->isInUse(destination)) { + this->config->consumerLock.readLock().unlock(); + return true; + } } + this->config->consumerLock.readLock().unlock(); + } catch (Exception& ex) { + this->config->consumerLock.readLock().unlock(); + throw; } return false; @@ -1123,7 +1183,15 @@ void ActiveMQSessionKernel::addConsumer( try { this->checkClosed(); - this->config->consumers.add(consumer); + + this->config->consumerLock.writeLock().lock(); + try { + this->config->consumers.add(consumer); + this->config->consumerLock.writeLock().unlock(); + } catch (Exception& ex) { + this->config->consumerLock.writeLock().unlock(); + throw; + } // Register this as a message dispatcher for the consumer. this->connection->addDispatcher(consumer->getConsumerInfo()->getConsumerId(), this); @@ -1138,7 +1206,14 @@ void ActiveMQSessionKernel::removeConsum try { this->connection->removeDispatcher(consumer->getConsumerId()); - this->config->consumers.remove(consumer); + this->config->consumerLock.writeLock().lock(); + try { + this->config->consumers.remove(consumer); + this->config->consumerLock.writeLock().unlock(); + } catch (Exception& ex) { + this->config->consumerLock.writeLock().unlock(); + throw; + } } AMQ_CATCH_RETHROW( ActiveMQException ) AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException ) @@ -1150,7 +1225,16 @@ void ActiveMQSessionKernel::addProducer( try { this->checkClosed(); - this->config->producers.add(producer); + + this->config->producerLock.writeLock().lock(); + try { + this->config->producers.add(producer); + this->config->producerLock.writeLock().unlock(); + } catch(Exception& ex) { + this->config->producerLock.writeLock().unlock(); + throw; + } + this->connection->addProducer(producer); } AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException ) @@ -1163,7 +1247,14 @@ void ActiveMQSessionKernel::removeProduc try { this->connection->removeProducer(producer->getProducerId()); - this->config->producers.remove(producer); + this->config->producerLock.writeLock().lock(); + try { + this->config->producers.remove(producer); + this->config->producerLock.writeLock().unlock(); + } catch(Exception& ex) { + this->config->producerLock.writeLock().unlock(); + throw; + } } AMQ_CATCH_RETHROW( ActiveMQException ) AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException ) @@ -1173,13 +1264,23 @@ void ActiveMQSessionKernel::removeProduc //////////////////////////////////////////////////////////////////////////////// Pointer<ActiveMQProducerKernel> ActiveMQSessionKernel::lookupProducerKernel(Pointer<ProducerId> id) { - std::auto_ptr<Iterator<Pointer<ActiveMQProducerKernel> > > producerIter(this->config->producers.iterator()); + this->config->producerLock.readLock().lock(); + try { + + std::auto_ptr<Iterator<Pointer<ActiveMQProducerKernel> > > producerIter(this->config->producers.iterator()); - while (producerIter->hasNext()) { - Pointer<ActiveMQProducerKernel> producer = producerIter->next(); - if (producer->getProducerId()->equals(*id)) { - return producer; + while (producerIter->hasNext()) { + Pointer<ActiveMQProducerKernel> producer = producerIter->next(); + if (producer->getProducerId()->equals(*id)) { + this->config->producerLock.readLock().unlock(); + return producer; + } } + + this->config->producerLock.readLock().unlock(); + } catch(Exception& ex) { + this->config->producerLock.readLock().unlock(); + throw; } return Pointer<ActiveMQProducerKernel>(); @@ -1188,13 +1289,21 @@ Pointer<ActiveMQProducerKernel> ActiveMQ //////////////////////////////////////////////////////////////////////////////// Pointer<ActiveMQConsumerKernel> ActiveMQSessionKernel::lookupConsumerKernel(Pointer<ConsumerId> id) { - Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator()); + this->config->consumerLock.readLock().lock(); + try { + Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator()); - while (iter->hasNext()) { - Pointer<ActiveMQConsumerKernel> consumer = iter->next(); - if (consumer->getConsumerId()->equals(*id)) { - return consumer; + while (iter->hasNext()) { + Pointer<ActiveMQConsumerKernel> consumer = iter->next(); + if (consumer->getConsumerId()->equals(*id)) { + this->config->consumerLock.readLock().unlock(); + return consumer; + } } + this->config->consumerLock.readLock().unlock(); + } catch (Exception& ex) { + this->config->consumerLock.readLock().unlock(); + throw; } return Pointer<ActiveMQConsumerKernel>(); @@ -1203,13 +1312,21 @@ Pointer<ActiveMQConsumerKernel> ActiveMQ //////////////////////////////////////////////////////////////////////////////// bool ActiveMQSessionKernel::iterateConsumers() { - Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator()); + this->config->consumerLock.readLock().lock(); + try { + Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator()); - while (iter->hasNext()) { - Pointer<ActiveMQConsumerKernel> consumer = iter->next(); - if (consumer->iterate()) { - return true; + while (iter->hasNext()) { + Pointer<ActiveMQConsumerKernel> consumer = iter->next(); + if (consumer->iterate()) { + this->config->consumerLock.readLock().unlock(); + return true; + } } + this->config->consumerLock.readLock().unlock(); + } catch (Exception& ex) { + this->config->consumerLock.readLock().unlock(); + throw; } return false; @@ -1218,29 +1335,43 @@ bool ActiveMQSessionKernel::iterateConsu //////////////////////////////////////////////////////////////////////////////// void ActiveMQSessionKernel::setPrefetchSize(Pointer<ConsumerId> id, int prefetch) { - Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator()); + this->config->consumerLock.readLock().lock(); + try { + Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator()); - while (iter->hasNext()) { - Pointer<ActiveMQConsumerKernel> consumer = iter->next(); - if (consumer->getConsumerId()->equals(*id)) { - consumer->setPrefetchSize(prefetch); + while (iter->hasNext()) { + Pointer<ActiveMQConsumerKernel> consumer = iter->next(); + if (consumer->getConsumerId()->equals(*id)) { + consumer->setPrefetchSize(prefetch); + } } + this->config->consumerLock.readLock().unlock(); + } catch (Exception& ex) { + this->config->consumerLock.readLock().unlock(); + throw; } } //////////////////////////////////////////////////////////////////////////////// void ActiveMQSessionKernel::close(Pointer<ConsumerId> id) { - Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator()); + this->config->consumerLock.readLock().lock(); + try { + Pointer<Iterator< Pointer<ActiveMQConsumerKernel> > > iter(this->config->consumers.iterator()); - while (iter->hasNext()) { - Pointer<ActiveMQConsumerKernel> consumer = iter->next(); - if (consumer->getConsumerId()->equals(*id)) { - try { - consumer->close(); - } catch (cms::CMSException& e) { + while (iter->hasNext()) { + Pointer<ActiveMQConsumerKernel> consumer = iter->next(); + if (consumer->getConsumerId()->equals(*id)) { + try { + consumer->close(); + } catch (cms::CMSException& e) { + } } } + this->config->consumerLock.readLock().unlock(); + } catch (Exception& ex) { + this->config->consumerLock.readLock().unlock(); + throw; } }