Author: robbie
Date: Sun Mar 27 21:31:32 2011
New Revision: 1086040

URL: http://svn.apache.org/viewvc?rev=1086040&view=rev
Log:
QPID-3165: ensure all subscriptions are checked before making the decision on 
whether to stop delivering. Use a boolean instead of doing a 0/1 toggle and 
update variables to generally clarify logic. Use an int instead of a long for 
the iteration decrementing

Modified:
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1086040&r1=1086039&r2=1086040&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
 Sun Mar 27 21:31:32 2011
@@ -1808,14 +1808,40 @@ public class SimpleAMQQueue implements A
     }
 
 
+    /**
+     * Used by queue Runners to asynchronously deliver messages to consumers.
+     *
+     * A queue Runner is started whenever a state change occurs, e.g when a new
+     * message arrives on the queue and cannot be immediately delivered to a
+     * subscription (i.e. asynchronous delivery is required). Unless there are
+     * SubFlushRunners operating (due to subscriptions unsuspending) which are
+     * capable of accepting/delivering all messages then these messages would
+     * otherwise remain on the queue.
+     *
+     * processQueue should be running while there are messages on the queue AND
+     * there are subscriptions that can deliver them. If there are no
+     * subscriptions capable of delivering the remaining messages on the queue
+     * then processQueue should stop to prevent spinning.
+     *
+     * Since processQueue is runs in a fixed size Executor, it should not run
+     * indefinitely to prevent starving other tasks of CPU (e.g jobs to process
+     * incoming messages may not be able to be scheduled in the thread pool
+     * because all threads are working on clearing down large queues). To solve
+     * this problem, after an arbitrary number of message deliveries the
+     * processQueue job stops iterating, resubmits itself to the executor, and
+     * ends the current instance
+     *
+     * @param runner the Runner to schedule
+     * @throws AMQException
+     */
     private void processQueue(Runnable runner) throws AMQException
     {
         long stateChangeCount;
         long previousStateChangeCount = Long.MIN_VALUE;
         boolean deliveryIncomplete = true;
 
-        int extraLoops = 1;
-        long iterations = MAX_ASYNC_DELIVERIES;
+        boolean lastLoop = false;
+        int iterations = MAX_ASYNC_DELIVERIES;
 
         _asynchronousRunner.compareAndSet(runner, null);
 
@@ -1832,12 +1858,14 @@ public class SimpleAMQQueue implements A
 
             if (previousStateChangeCount != stateChangeCount)
             {
-                extraLoops = 1;
+                //further asynchronous delivery is required since the
+                //previous loop. keep going if iteration slicing allows.
+                lastLoop = false;
             }
 
             previousStateChangeCount = stateChangeCount;
-            deliveryIncomplete = _subscriptionList.size() != 0;
-            boolean done;
+            boolean allSubscriptionsDone = true;
+            boolean subscriptionDone;
 
             SubscriptionList.SubscriptionNodeIterator subscriptionIter = 
_subscriptionList.iterator();
             //iterate over the subscribers and try to advance their pointer
@@ -1847,30 +1875,25 @@ public class SimpleAMQQueue implements A
                 sub.getSendLock();
                 try
                 {
-
-                    done = attemptDelivery(sub);
-
-                    if (done)
+                    //attempt delivery. returns true if no further delivery 
currently possible to this sub
+                    subscriptionDone = attemptDelivery(sub);
+                    if (subscriptionDone)
                     {
-                        if (extraLoops == 0)
+                        //close autoClose subscriptions if we are not 
currently intent on continuing
+                        if (lastLoop && sub.isAutoClose())
                         {
-                            deliveryIncomplete = false;
-                            if (sub.isAutoClose())
-                            {
-                                unregisterSubscription(sub);
+                            unregisterSubscription(sub);
 
-                                sub.confirmAutoClose();
-                            }
-                        }
-                        else
-                        {
-                            extraLoops--;
+                            sub.confirmAutoClose();
                         }
                     }
                     else
                     {
+                        //this subscription can accept additional deliveries, 
so we must 
+                        //keep going after this (if iteration slicing allows 
it)
+                        allSubscriptionsDone = false;
+                        lastLoop = false;
                         iterations--;
-                        extraLoops = 1;
                     }
                 }
                 finally
@@ -1878,10 +1901,34 @@ public class SimpleAMQQueue implements A
                     sub.releaseSendLock();
                 }
             }
+
+            if(allSubscriptionsDone && lastLoop)
+            {
+                //We have done an extra loop already and there are again
+                //again no further delivery attempts possible, only
+                //keep going if state change demands it.
+                deliveryIncomplete = false;
+            }
+            else if(allSubscriptionsDone)
+            {
+                //All subscriptions reported being done, but we have to do
+                //an extra loop if the iterations are not exhausted and
+                //there is still any work to be done
+                deliveryIncomplete = _subscriptionList.size() != 0;
+                lastLoop = true;
+            }
+            else
+            {
+                //some subscriptions can still accept more messages,
+                //keep going if iteration count allows.
+                lastLoop = false;
+                deliveryIncomplete = true;
+            }
+
             _asynchronousRunner.set(null);
         }
 
-        // If deliveries == 0 then the limitting factor was the time-slicing 
rather than available messages or credit
+        // If iterations == 0 then the limiting factor was the time-slicing 
rather than available messages or credit
         // therefore we should schedule this runner again (unless someone 
beats us to it :-) ).
         if (iterations == 0 && _asynchronousRunner.compareAndSet(null, runner))
         {



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to