Author: ritchiem
Date: Fri Apr 3 13:33:42 2009
New Revision: 761671
URL: http://svn.apache.org/viewvc?rev=761671&view=rev
Log:
QPID-1784 Update to FlowableBaseQueueEntryList to ensure that the inhaler and
purger threads will stop when the inMemory values are within the correct range.
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java?rev=761671&r1=761670&r2=761671&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
Fri Apr 3 13:33:42 2009
@@ -359,7 +359,12 @@
_asynchronousInhaler.compareAndSet(messageInhaler, null);
int inhaled = 1;
+ //Because we may not be able to totally fill up to _memoryUsageMaximum
we need to be able to say we've done
+ // enough loading and this inhale process should stop
+ boolean finshedInhaling = false;
+
while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) // we havn't
filled our max memory
+ && !finshedInhaling // Have we loaded all we can fit into memory
&& (_atomicQueueInMemory.get() < _atomicQueueSize.get()) // we
haven't loaded all that is available
&& (inhaled < BATCH_PROCESS_COUNT) // limit the number of runs
we do
&& (inhaled > 0) // ensure we could inhale something
@@ -379,7 +384,9 @@
// we won't have checked the last entry to see if we can load it.
So create atEndofList and update it based
// on the return from advance() which returns true if it can
advance.
boolean atEndofList = false;
- while ((_atomicQueueInMemory.get() < _memoryUsageMaximum) // we
havn't filled our max memory
+
+ while ((_atomicQueueInMemory.get() <= _memoryUsageMaximum) // we
haven't filled our max memory
+ && !finshedInhaling // Have we loaded all we can fit into
memory
&& (inhaled < BATCH_PROCESS_COUNT) // limit the number of
runs we do
&& !atEndofList) // We have reached end of list QueueEntries
{
@@ -394,7 +401,7 @@
{
_log.debug("Entry won't fit in memory stopping
inhaler:" + entry.debugIdentity());
}
- inhaled = BATCH_PROCESS_COUNT;
+ finshedInhaling = true;
}
else
{
@@ -421,7 +428,7 @@
}
//If we have become flowed or have more capacity since we stopped then
schedule the thread to run again.
- if (_flowed.get() && _atomicQueueInMemory.get() < _memoryUsageMaximum)
+ if (!finshedInhaling _flowed.get() && _atomicQueueInMemory.get() <
_memoryUsageMaximum)
{
if (_log.isInfoEnabled())
{
@@ -471,7 +478,7 @@
_asynchronousPurger.compareAndSet(messagePurger, null);
int purged = 0;
- while ((_atomicQueueInMemory.get() > _memoryUsageMinimum)
+ while ((_atomicQueueInMemory.get() > _memoryUsageMaximum)
&& purged < BATCH_PROCESS_COUNT
&& _asynchronousPurger.compareAndSet(null, messagePurger))
{
@@ -496,6 +503,12 @@
if (entry.isAvailable() && !entry.isFlowed())
{
memoryUsage += entry.getSize();
+ // If this message is what puts us over the limit then
break
+ // out of this loop as we need to purge this item.
+ if (memoryUsage > _memoryUsageMaximum)
+ {
+ break;
+ }
}
atTail = !iterator.advance();
@@ -525,7 +538,7 @@
}
//If we are still flowed and are over the minimum value then schedule
to run again.
- if (_flowed.get() && _atomicQueueInMemory.get() > _memoryUsageMinimum)
+ if (_flowed.get() && _atomicQueueInMemory.get() > _memoryUsageMaximum)
{
if (_log.isInfoEnabled())
{
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]