Author: gtully
Date: Mon Apr 12 14:03:18 2010
New Revision: 933240

URL: http://svn.apache.org/viewvc?rev=933240&view=rev
Log:
resolve https://issues.apache.org/activemq/browse/AMQ-2693 - loaded machine 
with slow thread creation can delay interruption processing past next dispatch 
which can be problematic. prefetch=1 will workaround

Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=933240&r1=933239&r2=933240&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
 Mon Apr 12 14:03:18 2010
@@ -144,6 +144,7 @@ public class ActiveMQMessageConsumer imp
     private ExecutorService executorService;
     private MessageTransformer transformer;
     private boolean clearDispatchList;
+    boolean inProgressClearRequiredFlag;
 
     private MessageAck pendingAck;
     private long lastDeliveredSequenceId;
@@ -655,23 +656,32 @@ public class ActiveMQMessageConsumer imp
         this.session.asyncSendPacket(removeCommand);
     }
     
-    void clearMessagesInProgress() {
+    void inProgressClearRequired() {
+        inProgressClearRequiredFlag = true;
         // deal with delivered messages async to avoid lock contention with in 
progress acks
         clearDispatchList = true;
-        synchronized (unconsumedMessages.getMutex()) {            
-            if (LOG.isDebugEnabled()) {
-                LOG.debug(getConsumerId() + " clearing dispatched list (" + 
unconsumedMessages.size() + ") on transport interrupt");
-            }
-            // ensure unconsumed are rolledback up front as they may get 
redelivered to another consumer
-            List<MessageDispatch> list = unconsumedMessages.removeAll();
-            if (!this.info.isBrowser()) {
-                for (MessageDispatch old : list) {
-                    session.connection.rollbackDuplicate(this, 
old.getMessage());
+    }
+    
+    void clearMessagesInProgress() {
+        if (inProgressClearRequiredFlag) {
+            synchronized (unconsumedMessages.getMutex()) {
+                if (inProgressClearRequiredFlag) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug(getConsumerId() + " clearing dispatched list 
(" + unconsumedMessages.size() + ") on transport interrupt");
+                    }
+                    // ensure unconsumed are rolledback up front as they may 
get redelivered to another consumer
+                    List<MessageDispatch> list = 
unconsumedMessages.removeAll();
+                    if (!this.info.isBrowser()) {
+                        for (MessageDispatch old : list) {
+                            session.connection.rollbackDuplicate(this, 
old.getMessage());
+                        }
+                    }
+                    // allow dispatch on this connection to resume
+                    
session.connection.transportInterruptionProcessingComplete();
+                    inProgressClearRequiredFlag = false;
                 }
             }
         }
-        // allow dispatch on this connection to resume
-        session.connection.transportInterruptionProcessingComplete();
     }
 
     void deliverAcks() {
@@ -1192,6 +1202,7 @@ public class ActiveMQMessageConsumer imp
     public void dispatch(MessageDispatch md) {
         MessageListener listener = this.messageListener.get();
         try {
+            clearMessagesInProgress();
             clearDispatchList();
             synchronized (unconsumedMessages.getMutex()) {
                 if (!unconsumedMessages.isClosed()) {

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=933240&r1=933239&r2=933240&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
 Mon Apr 12 14:03:18 2010
@@ -649,6 +649,7 @@ public class ActiveMQSession implements 
         // connection.transportInterruptionProcessingComplete()
         //
         for (final ActiveMQMessageConsumer consumer : consumers) {
+            consumer.inProgressClearRequired();
             scheduler.executeAfterDelay(new Runnable() {
                 public void run() {
                     consumer.clearMessagesInProgress();


Reply via email to