Author: kwall
Date: Thu Feb  9 14:31:33 2012
New Revision: 1242339

URL: http://svn.apache.org/viewvc?rev=1242339&view=rev
Log:
QPID-3821: Uncaught exception thrown in QueueRunner.run() could cause 
QueueRunner to remain stuck in RUNNING state permanently

Modified:
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java?rev=1242339&r1=1242338&r2=1242339&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRunner.java
 Thu Feb  9 14:31:33 2012
@@ -46,7 +46,6 @@ public class QueueRunner implements Runn
     private static int SCHEDULED = 1;
     private static int RUNNING = 2;
 
-
     private final AtomicInteger _scheduled = new AtomicInteger(IDLE);
 
     private final AtomicBoolean _stateChange = new AtomicBoolean();
@@ -54,8 +53,6 @@ public class QueueRunner implements Runn
     private final AtomicLong _lastRunAgain = new AtomicLong();
     private final AtomicLong _lastRunTime = new AtomicLong();
 
-    private long _continues;
-
     public QueueRunner(SimpleAMQQueue queue)
     {
         _queue = queue;
@@ -86,23 +83,22 @@ public class QueueRunner implements Runn
                 }
                 else
                 {
-                    _logger.info(errorMessage + transe.getMessage());
+                    _logger.info(errorMessage + ' ' + transe.getMessage());
                 }
             }
             finally
             {
                 CurrentActor.remove();
-            }
-            _scheduled.compareAndSet(RUNNING, IDLE);
-            long stateChangeCount = _queue.getStateChangeCount();
-            _lastRunAgain.set(runAgain);
-            _lastRunTime.set(System.nanoTime());
-            if(runAgain == 0L || runAgain != stateChangeCount || 
_stateChange.compareAndSet(true,false))
-            {
-                _continues++;
-                if(_scheduled.compareAndSet(IDLE, SCHEDULED))
+                _scheduled.compareAndSet(RUNNING, IDLE);
+                final long stateChangeCount = _queue.getStateChangeCount();
+                _lastRunAgain.set(runAgain);
+                _lastRunTime.set(System.nanoTime());
+                if(runAgain == 0L || runAgain != stateChangeCount || 
_stateChange.compareAndSet(true,false))
                 {
-                    _queue.execute(this);
+                    if(_scheduled.compareAndSet(IDLE, SCHEDULED))
+                    {
+                        _queue.execute(this);
+                    }
                 }
             }
 

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java?rev=1242339&r1=1242338&r2=1242339&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubFlushRunner.java
 Thu Feb  9 14:31:33 2012
@@ -26,6 +26,7 @@ import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.logging.actors.CurrentActor;
 import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.transport.TransportException;
 
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -68,18 +69,30 @@ class SubFlushRunner implements Runnable
             }
             catch (AMQException e)
             {
-                _logger.error(e);
+                _logger.error("Exception during asynchronous delivery by " + 
toString(), e);
             }
-            finally
+            catch (final TransportException transe)
             {
-                CurrentActor.remove();
+                final String errorMessage = "Problem during asynchronous 
delivery by " + toString();
+                if(_logger.isDebugEnabled())
+                {
+                    _logger.debug(errorMessage, transe);
+                }
+                else
+                {
+                    _logger.info(errorMessage + ' ' + transe.getMessage());
+                }
             }
-            _scheduled.compareAndSet(RUNNING, IDLE);
-            if ((!complete || _stateChange.compareAndSet(true,false))&& 
!_sub.isSuspended())
+            finally
             {
-                if(_scheduled.compareAndSet(IDLE,SCHEDULED))
+                CurrentActor.remove();
+                _scheduled.compareAndSet(RUNNING, IDLE);
+                if ((!complete || _stateChange.compareAndSet(true,false))&& 
!_sub.isSuspended())
                 {
-                    getQueue().execute(this);
+                    if(_scheduled.compareAndSet(IDLE,SCHEDULED))
+                    {
+                        getQueue().execute(this);
+                    }
                 }
             }
         }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to