Repository: accumulo
Updated Branches:
  refs/heads/master 8253ca05f -> 67670cdda


ACCUMULO-2827: HeapIterator optimization. Assumes it's more probable that the 
next entry in a merge comes from the current iterator. Also switching from 
PriorityBuffer to PriorityQueue<> to avoid unsafe casts

Signed-off-by: Keith Turner <ktur...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/9801fe97
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/9801fe97
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/9801fe97

Branch: refs/heads/master
Commit: 9801fe976ab5b86b3ed337daff586b96d2cb7e4d
Parents: 886cd19
Author: Jonathan Park <p...@sqrrl.com>
Authored: Mon Jun 23 15:07:27 2014 -0400
Committer: Keith Turner <ktur...@apache.org>
Committed: Thu Jun 26 10:45:51 2014 -0400

----------------------------------------------------------------------
 .../core/iterators/system/HeapIterator.java     | 132 ++++++++++---------
 1 file changed, 73 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/9801fe97/core/src/main/java/org/apache/accumulo/core/iterators/system/HeapIterator.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/accumulo/core/iterators/system/HeapIterator.java
 
b/core/src/main/java/org/apache/accumulo/core/iterators/system/HeapIterator.java
index e54f37c..4b26dcc 100644
--- 
a/core/src/main/java/org/apache/accumulo/core/iterators/system/HeapIterator.java
+++ 
b/core/src/main/java/org/apache/accumulo/core/iterators/system/HeapIterator.java
@@ -17,99 +17,113 @@
 package org.apache.accumulo.core.iterators.system;
 
 import java.io.IOException;
+import java.util.Comparator;
+import java.util.PriorityQueue;
 
 import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
-import org.apache.commons.collections.buffer.PriorityBuffer;
 
 public abstract class HeapIterator implements 
SortedKeyValueIterator<Key,Value> {
-  private PriorityBuffer heap;
-  private SortedKeyValueIterator<Key,Value> currentIter;
-  
-  private static class Index implements Comparable<Index> {
-    SortedKeyValueIterator<Key,Value> iter;
-    
-    public Index(SortedKeyValueIterator<Key,Value> iter) {
-      this.iter = iter;
-    }
-    
-    public int compareTo(Index o) {
-      return iter.getTopKey().compareTo(o.iter.getTopKey());
+  private PriorityQueue<SortedKeyValueIterator<Key,Value>> heap;
+  private SortedKeyValueIterator<Key,Value> topIdx = null;
+  private Key nextKey;
+
+  private static class SKVIComparator implements 
Comparator<SortedKeyValueIterator<Key,Value>> {
+
+    @Override
+    public int compare(SortedKeyValueIterator<Key,Value> o1, 
SortedKeyValueIterator<Key,Value> o2) {
+      return o1.getTopKey().compareTo(o2.getTopKey());
     }
   }
-  
+
   protected HeapIterator() {
     heap = null;
   }
-  
+
   protected HeapIterator(int maxSize) {
     createHeap(maxSize);
   }
-  
+
   protected void createHeap(int maxSize) {
     if (heap != null)
       throw new IllegalStateException("heap already exist");
-    
-    heap = new PriorityBuffer(maxSize == 0 ? 1 : maxSize);
+
+    heap = new PriorityQueue<SortedKeyValueIterator<Key,Value>>(maxSize == 0 ? 
1 : maxSize, new SKVIComparator());
   }
-  
+
   @Override
   final public Key getTopKey() {
-    return currentIter.getTopKey();
+    return topIdx.getTopKey();
   }
-  
+
   @Override
   final public Value getTopValue() {
-    return currentIter.getTopValue();
+    return topIdx.getTopValue();
   }
-  
+
   @Override
   final public boolean hasTop() {
-    return heap.size() > 0;
+    return topIdx != null;
   }
-  
+
   @Override
   final public void next() throws IOException {
-    switch (heap.size()) {
-      case 0:
-        throw new IllegalStateException("Called next() when there is no top");
-      case 1:
-        // optimization for case when heap contains one entry,
-        // avoids remove and add
-        currentIter.next();
-        if (!currentIter.hasTop()) {
-          heap.remove();
-          currentIter = null;
-        }
-        break;
-      default:
-        Index idx = (Index) heap.remove();
-        idx.iter.next();
-        if (idx.iter.hasTop()) {
-          heap.add(idx);
-        }
-        // to get to the default case heap has at least
-        // two entries, therefore there must be at least
-        // one entry when get() is called below
-        currentIter = ((Index) heap.get()).iter;
+    if (topIdx == null) {
+      throw new IllegalStateException("Called next() when there is no top");
+    }
+
+    topIdx.next();
+    if (!topIdx.hasTop()) {
+      if (nextKey == null) {
+        // No iterators left
+        topIdx = null;
+        return;
+      }
+
+      pullReferencesFromHeap();
+    } else {
+      if (nextKey == null) {
+        // topIdx is the only iterator
+        return;
+      }
+
+      if (nextKey.compareTo(topIdx.getTopKey()) < 0) {
+        // Grab the next top iterator and put the current top iterator back on 
the heap
+        // This updating of references is special-cased to save on percolation 
on edge cases
+        // since the current top is guaranteed to not be the minimum
+        SortedKeyValueIterator<Key,Value> nextTopIdx = heap.remove();
+        heap.add(topIdx);
+
+        topIdx = nextTopIdx;
+        nextKey = heap.peek().getTopKey();
+      }
     }
   }
-  
+
+  private void pullReferencesFromHeap() {
+    topIdx = heap.remove();
+    if (!heap.isEmpty()) {
+      nextKey = heap.peek().getTopKey();
+    } else {
+      nextKey = null;
+    }
+  }
+
   final protected void clear() {
     heap.clear();
-    currentIter = null;
+    topIdx = null;
+    nextKey = null;
   }
-  
+
   final protected void addSource(SortedKeyValueIterator<Key,Value> source) {
-    
-    if (source.hasTop())
-      heap.add(new Index(source));
-    
-    if (heap.size() > 0)
-      currentIter = ((Index) heap.get()).iter;
-    else
-      currentIter = null;
+    if (source.hasTop()) {
+      heap.add(source);
+      if (topIdx != null) {
+        heap.add(topIdx);
+      }
+
+      pullReferencesFromHeap();
+    }
   }
-  
 }

Reply via email to