Author: rgodfrey
Date: Wed Jan 25 14:40:30 2012
New Revision: 1235771

URL: http://svn.apache.org/viewvc?rev=1235771&view=rev
Log:
QPID-3780 : [Java Broker] reduce scavenge overhead on large queues

Modified:
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java?rev=1235771&r1=1235770&r2=1235771&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleQueueEntryList.java
 Wed Jan 25 14:40:30 2012
@@ -21,6 +21,7 @@
 package org.apache.qpid.server.queue;
 
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import org.apache.qpid.server.message.ServerMessage;
 
@@ -46,6 +47,7 @@ public class SimpleQueueEntryList implem
 
     private AtomicLong _scavenges = new AtomicLong(0L);
     private final long _scavengeCount = 
Integer.getInteger("qpid.queue.scavenge_count", 50);
+    private final AtomicReference<SimpleQueueEntryImpl> _unscavengedHWM = new 
AtomicReference<SimpleQueueEntryImpl>();
 
 
     public SimpleQueueEntryList(AMQQueue queue)
@@ -55,28 +57,17 @@ public class SimpleQueueEntryList implem
         _tail = _head;
     }
 
-    void advanceHead()
-    {
-        SimpleQueueEntryImpl next = _head.getNextNode();
-        SimpleQueueEntryImpl newNext = _head.getNextValidEntry();
-
-        if (next == newNext)
-        {
-            if (_scavenges.incrementAndGet() > _scavengeCount)
-            {
-                _scavenges.set(0L);
-                scavenge();
-            }
-        }
-    }
-
     void scavenge()
     {
+        SimpleQueueEntryImpl hwm = _unscavengedHWM.getAndSet(null);
         SimpleQueueEntryImpl next = _head.getNextValidEntry();
 
-        while (next != null)
+        if(hwm != null)
         {
-            next = next.getNextValidEntry();
+            while (next != null && hwm.compareTo(next)>0)
+            {
+                next = next.getNextValidEntry();
+            }
         }
     }
 
@@ -182,7 +173,24 @@ public class SimpleQueueEntryList implem
 
     public void entryDeleted(SimpleQueueEntryImpl queueEntry)
     {
-        advanceHead();
+        SimpleQueueEntryImpl next = _head.getNextNode();
+        SimpleQueueEntryImpl newNext = _head.getNextValidEntry();
+
+        // the head of the queue has not been deleted, hence the deletion must 
have been mid queue.
+        if (next == newNext)
+        {
+            SimpleQueueEntryImpl unscavengedHWM = _unscavengedHWM.get();
+            while(unscavengedHWM == null || 
unscavengedHWM.compareTo(queueEntry)<0)
+            {
+                _unscavengedHWM.compareAndSet(unscavengedHWM, queueEntry);
+                unscavengedHWM = _unscavengedHWM.get();
+            }
+            if (_scavenges.incrementAndGet() > _scavengeCount)
+            {
+                _scavenges.set(0L);
+                scavenge();
+            }
+        }
     }
 
     public int getPriorities()



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to