Author: ritchiem
Date: Fri Apr  3 14:15:08 2009
New Revision: 761688

URL: http://svn.apache.org/viewvc?rev=761688&view=rev
Log:
QPID-1784 Update to FlowableBaseQueueEntryList to ensure that the inhaler and 
purger threads will stop when the inMemory values are within the correct range.

Merge of r761671 and r761674 from trunk

Modified:
    
qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java

Modified: 
qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
URL: 
http://svn.apache.org/viewvc/qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java?rev=761688&r1=761687&r2=761688&view=diff
==============================================================================
--- 
qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
 (original)
+++ 
qpid/branches/0.5-release/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
 Fri Apr  3 14:15:08 2009
@@ -359,7 +359,12 @@
         _asynchronousInhaler.compareAndSet(messageInhaler, null);
         int inhaled = 1;
 
+        //Because we may not be able to totally fill up to _memoryUsageMaximum 
we need to be able to say we've done
+        // enough loading and this inhale process should stop
+        boolean finshedInhaling = false;
+
         while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) // we havn't 
filled our max memory
+               && !finshedInhaling // Have we loaded all we can fit into memory
                && (_atomicQueueInMemory.get() < _atomicQueueSize.get()) // we 
haven't loaded all that is available
                && (inhaled < BATCH_PROCESS_COUNT) // limit the number of runs 
we do
                && (inhaled > 0) // ensure we could inhale something
@@ -379,7 +384,9 @@
             // we won't have checked the last entry to see if we can load it. 
So create atEndofList and update it based
             // on the return from advance() which returns true if it can 
advance.
             boolean atEndofList = false;
-            while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) // we 
havn't filled our max memory
+
+            while ((_atomicQueueInMemory.get() <= _memoryUsageMaximum) // we 
haven't filled our max memory
+                   && !finshedInhaling // Have we loaded all we can fit into 
memory
                    && (inhaled < BATCH_PROCESS_COUNT) // limit the number of 
runs we do
                    && !atEndofList) // We have reached end of list QueueEntries
             {
@@ -394,7 +401,7 @@
                         {
                             _log.debug("Entry won't fit in memory stopping 
inhaler:" + entry.debugIdentity());
                         }
-                        inhaled = BATCH_PROCESS_COUNT;
+                        finshedInhaling = true;
                     }
                     else
                     {
@@ -421,7 +428,7 @@
         }
 
         //If we have become flowed or have more capacity since we stopped then 
schedule the thread to run again.
-        if (_flowed.get() && _atomicQueueInMemory.get() < _memoryUsageMaximum)
+        if (!finshedInhaling && _flowed.get() && _atomicQueueInMemory.get() < 
_memoryUsageMaximum)
         {
             if (_log.isInfoEnabled())
             {
@@ -471,7 +478,7 @@
         _asynchronousPurger.compareAndSet(messagePurger, null);
         int purged = 0;
 
-        while ((_atomicQueueInMemory.get() > _memoryUsageMinimum)
+        while ((_atomicQueueInMemory.get() > _memoryUsageMaximum)
                && purged < BATCH_PROCESS_COUNT
                && _asynchronousPurger.compareAndSet(null, messagePurger))
         {
@@ -496,6 +503,12 @@
                 if (entry.isAvailable() && !entry.isFlowed())
                 {
                     memoryUsage += entry.getSize();
+                    // If this message is what puts us over the limit then 
break
+                    // out of this loop as we need to purge this item.
+                    if (memoryUsage > _memoryUsageMaximum)
+                    {
+                        break;
+                    }
                 }
 
                 atTail = !iterator.advance();
@@ -525,7 +538,7 @@
         }
 
         //If we are still flowed and are over the minimum value then schedule 
to run again.
-        if (_flowed.get() && _atomicQueueInMemory.get() > _memoryUsageMinimum)
+        if (_flowed.get() && _atomicQueueInMemory.get() > _memoryUsageMaximum)
         {
             _log.info("Rescheduling Purger:" + _queue.getName());
             _purger.execute(messagePurger);



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to