Author: ritchiem
Date: Fri Apr  3 13:33:42 2009
New Revision: 761671

URL: http://svn.apache.org/viewvc?rev=761671&view=rev
Log:
QPID-1784 Update to FlowableBaseQueueEntryList to ensure that the inhaler and 
purger threads will stop when the inMemory values are within the correct range.

Modified:
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java?rev=761671&r1=761670&r2=761671&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
 Fri Apr  3 13:33:42 2009
@@ -359,7 +359,12 @@
         _asynchronousInhaler.compareAndSet(messageInhaler, null);
         int inhaled = 1;
 
+        //Because we may not be able to totally fill up to _memoryUsageMaximum 
we need to be able to say we've done
+        // enough loading and this inhale process should stop
+        boolean finshedInhaling = false;
+
         while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) // we havn't 
filled our max memory
+               && !finshedInhaling // Have we loaded all we can fit into memory
                && (_atomicQueueInMemory.get() < _atomicQueueSize.get()) // we 
haven't loaded all that is available
                && (inhaled < BATCH_PROCESS_COUNT) // limit the number of runs 
we do
                && (inhaled > 0) // ensure we could inhale something
@@ -379,7 +384,9 @@
             // we won't have checked the last entry to see if we can load it. 
So create atEndofList and update it based
             // on the return from advance() which returns true if it can 
advance.
             boolean atEndofList = false;
-            while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) // we 
havn't filled our max memory
+
+            while ((_atomicQueueInMemory.get() <= _memoryUsageMaximum) // we 
haven't filled our max memory
+                   && !finshedInhaling // Have we loaded all we can fit into 
memory
                    && (inhaled < BATCH_PROCESS_COUNT) // limit the number of 
runs we do
                    && !atEndofList) // We have reached end of list QueueEntries
             {
@@ -394,7 +401,7 @@
                         {
                             _log.debug("Entry won't fit in memory stopping 
inhaler:" + entry.debugIdentity());
                         }
-                        inhaled = BATCH_PROCESS_COUNT;
+                        finshedInhaling = true;
                     }
                     else
                     {
@@ -421,7 +428,7 @@
         }
 
         //If we have become flowed or have more capacity since we stopped then 
schedule the thread to run again.
-        if (_flowed.get() && _atomicQueueInMemory.get() < _memoryUsageMaximum)
+        if (!finshedInhaling _flowed.get() && _atomicQueueInMemory.get() < 
_memoryUsageMaximum)
         {
             if (_log.isInfoEnabled())
             {
@@ -471,7 +478,7 @@
         _asynchronousPurger.compareAndSet(messagePurger, null);
         int purged = 0;
 
-        while ((_atomicQueueInMemory.get() > _memoryUsageMinimum)
+        while ((_atomicQueueInMemory.get() > _memoryUsageMaximum)
                && purged < BATCH_PROCESS_COUNT
                && _asynchronousPurger.compareAndSet(null, messagePurger))
         {
@@ -496,6 +503,12 @@
                 if (entry.isAvailable() && !entry.isFlowed())
                 {
                     memoryUsage += entry.getSize();
+                    // If this message is what puts us over the limit then 
break
+                    // out of this loop as we need to purge this item.
+                    if (memoryUsage > _memoryUsageMaximum)
+                    {
+                        break;
+                    }
                 }
 
                 atTail = !iterator.advance();
@@ -525,7 +538,7 @@
         }
 
         //If we are still flowed and are over the minimum value then schedule 
to run again.
-        if (_flowed.get() && _atomicQueueInMemory.get() > _memoryUsageMinimum)
+        if (_flowed.get() && _atomicQueueInMemory.get() > _memoryUsageMaximum)
         {
             if (_log.isInfoEnabled())
             {



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to