Author: rgodfrey
Date: Wed Jan 25 14:40:30 2012
New Revision: 1235771
URL: http://svn.apache.org/viewvc?rev=1235771&view=rev
Log:
QPID-3780 : [Java Broker] reduce scavenge overhead on large queues
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java?rev=1235771&r1=1235770&r2=1235771&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
Wed Jan 25 14:40:30 2012
@@ -21,6 +21,7 @@
package org.apache.qpid.server.queue;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.qpid.server.message.ServerMessage;
@@ -46,6 +47,7 @@ public class SimpleQueueEntryList implem
private AtomicLong _scavenges = new AtomicLong(0L);
private final long _scavengeCount =
Integer.getInteger("qpid.queue.scavenge_count", 50);
+ private final AtomicReference<SimpleQueueEntryImpl> _unscavengedHWM = new
AtomicReference<SimpleQueueEntryImpl>();
public SimpleQueueEntryList(AMQQueue queue)
@@ -55,28 +57,17 @@ public class SimpleQueueEntryList implem
_tail = _head;
}
- void advanceHead()
- {
- SimpleQueueEntryImpl next = _head.getNextNode();
- SimpleQueueEntryImpl newNext = _head.getNextValidEntry();
-
- if (next == newNext)
- {
- if (_scavenges.incrementAndGet() > _scavengeCount)
- {
- _scavenges.set(0L);
- scavenge();
- }
- }
- }
-
void scavenge()
{
+ SimpleQueueEntryImpl hwm = _unscavengedHWM.getAndSet(null);
SimpleQueueEntryImpl next = _head.getNextValidEntry();
- while (next != null)
+ if(hwm != null)
{
- next = next.getNextValidEntry();
+ while (next != null && hwm.compareTo(next)>0)
+ {
+ next = next.getNextValidEntry();
+ }
}
}
@@ -182,7 +173,24 @@ public class SimpleQueueEntryList implem
public void entryDeleted(SimpleQueueEntryImpl queueEntry)
{
- advanceHead();
+ SimpleQueueEntryImpl next = _head.getNextNode();
+ SimpleQueueEntryImpl newNext = _head.getNextValidEntry();
+
+ // the head of the queue has not been deleted, hence the deletion must
have been mid queue.
+ if (next == newNext)
+ {
+ SimpleQueueEntryImpl unscavengedHWM = _unscavengedHWM.get();
+ while(unscavengedHWM == null ||
unscavengedHWM.compareTo(queueEntry)<0)
+ {
+ _unscavengedHWM.compareAndSet(unscavengedHWM, queueEntry);
+ unscavengedHWM = _unscavengedHWM.get();
+ }
+ if (_scavenges.incrementAndGet() > _scavengeCount)
+ {
+ _scavenges.set(0L);
+ scavenge();
+ }
+ }
}
public int getPriorities()
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]