Author: jstrachan Date: Thu Dec 14 07:23:47 2006 New Revision: 487235 URL: http://svn.apache.org/viewvc?view=rev&rev=487235 Log: Patch for AMQ-1093 to avoid a deadlock if the transport is being reconnected from inside a MessageListener which is calling a send(), lets make the explicit clear of the consumer dispatch list asynchronous and within the existing mutex
Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?view=diff&rev=487235&r1=487234&r2=487235 ============================================================================== --- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java (original) +++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java Thu Dec 14 07:23:47 2006 @@ -17,24 +17,7 @@ */ package org.apache.activemq; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; - -import javax.jms.IllegalStateException; -import javax.jms.InvalidDestinationException; -import javax.jms.JMSException; -import javax.jms.Message; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; - -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ActiveMQMessage; -import org.apache.activemq.command.ConsumerId; -import org.apache.activemq.command.ConsumerInfo; -import org.apache.activemq.command.MessageAck; -import org.apache.activemq.command.MessageDispatch; -import org.apache.activemq.command.MessagePull; +import org.apache.activemq.command.*; import org.apache.activemq.management.JMSConsumerStatsImpl; import org.apache.activemq.management.StatsCapable; import org.apache.activemq.management.StatsImpl; @@ -47,6 +30,12 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import javax.jms.IllegalStateException; +import javax.jms.*; +import javax.jms.Message; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -121,6 +110,7 @@ private AtomicBoolean deliveryingAcknowledgements = new AtomicBoolean(); private ExecutorService executorService = null; private MessageTransformer transformer; + private boolean clearDispatchList; /** * Create a MessageConsumer @@ -569,7 +559,14 @@ } void clearMessagesInProgress(){ - unconsumedMessages.clear(); + // we are called from inside the transport reconnection logic + // which involves us clearing all the connections' consumers + // dispatch lists and clearing them + // so rather than trying to grab a mutex (which could be already + // owned by the message listener calling the send) we will just set + // a flag so that the list can be cleared as soon as the + // dispatch thread is ready to flush the dispatch list + clearDispatchList= true; } void deliverAcks(){ @@ -859,7 +856,13 @@ MessageListener listener = this.messageListener; try { synchronized(unconsumedMessages.getMutex()){ - if (!unconsumedMessages.isClosed()) { + if (clearDispatchList) { + // we are reconnecting so lets flush the in progress messages + clearDispatchList = false; + unconsumedMessages.clear(); + } + + if (!unconsumedMessages.isClosed()) { if (listener != null && unconsumedMessages.isRunning() ) { ActiveMQMessage message = createActiveMQMessage(md); beforeMessageIsConsumed(md);