Use a priority queue in MergeSortResultIterator (Ankit Singhal)

Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/2dfede44
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/2dfede44
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/2dfede44

Branch: refs/heads/txn
Commit: 2dfede4488a3086f5b78cae48a5367f486d8a49c
Parents: 6478090
Author: Samarth <[email protected]>
Authored: Thu Nov 12 15:03:04 2015 -0800
Committer: Samarth <[email protected]>
Committed: Thu Nov 12 15:03:04 2015 -0800

----------------------------------------------------------------------
 .../MaterializedComparableResultIterator.java   |  72 +++++++++
 .../iterate/MergeSortResultIterator.java        |  83 ++++++----
 .../iterate/MergeSortResultIteratorTest.java    | 159 +++++++++++++++----
 3 files changed, 254 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/2dfede44/phoenix-core/src/main/java/org/apache/phoenix/iterate/MaterializedComparableResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MaterializedComparableResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MaterializedComparableResultIterator.java
new file mode 100644
index 0000000..093a098
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MaterializedComparableResultIterator.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.iterate;
+
+import java.sql.SQLException;
+import java.util.Comparator;
+import java.util.List;
+
+import org.apache.phoenix.schema.tuple.Tuple;
+
+/**
+ * Fully materialized comparable result iterator backed by the result iterator 
provided with
+ * comparator. No copy is made of the backing results iterator.
+ */
+public class MaterializedComparableResultIterator
+        implements PeekingResultIterator, 
Comparable<MaterializedComparableResultIterator> {
+
+    private PeekingResultIterator delegate;
+    private Comparator<? super Tuple> comparator;
+    private Tuple current;
+
+    public Tuple getCurrent() {
+        return current;
+    }
+
+    public MaterializedComparableResultIterator(PeekingResultIterator delegate,
+            Comparator<? super Tuple> c) throws SQLException {
+        this.delegate = delegate;
+        this.comparator = c;
+        this.current = delegate.peek();
+    }
+
+    public Tuple next() throws SQLException {
+        Tuple next = delegate.next();
+        this.current = delegate.peek();
+        return next;
+    }
+
+    public Tuple peek() throws SQLException {
+        return delegate.peek();
+    }
+
+    public void close() throws SQLException {
+        delegate.close();
+    }
+
+    @Override
+    public int compareTo(MaterializedComparableResultIterator o) {
+        return comparator.compare(this.getCurrent(), o.getCurrent());
+
+    }
+
+    @Override
+    public void explain(List<String> planSteps) {
+        delegate.explain(planSteps);
+    }
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2dfede44/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java
index 9ef3e70..75843eb 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java
@@ -18,38 +18,58 @@
 package org.apache.phoenix.iterate;
 
 import java.sql.SQLException;
+import java.util.Comparator;
 import java.util.List;
+import java.util.PriorityQueue;
 
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.SQLCloseables;
 import org.apache.phoenix.util.ServerUtil;
 
-
 /**
- * 
- * Base class for a ResultIterator that does a merge sort on the list of 
iterators
- * provided.
- *
- * 
+ * Base class for a ResultIterator that does a merge sort on the list of 
iterators provided.
  * @since 1.2
  */
 public abstract class MergeSortResultIterator implements PeekingResultIterator 
{
     protected final ResultIterators resultIterators;
     protected final ImmutableBytesWritable tempPtr = new 
ImmutableBytesWritable();
     private List<PeekingResultIterator> iterators;
-    
+    private PriorityQueue<MaterializedComparableResultIterator> minHeap;
+    private final IteratorComparator itrComparator = new IteratorComparator();
+
     public MergeSortResultIterator(ResultIterators iterators) {
         this.resultIterators = iterators;
     }
-    
+
     private List<PeekingResultIterator> getIterators() throws SQLException {
         if (iterators == null) {
             iterators = resultIterators.getIterators();
         }
         return iterators;
     }
-    
+
+    private PriorityQueue<MaterializedComparableResultIterator> getMinHeap() 
throws SQLException {
+        if (minHeap == null) {
+            List<PeekingResultIterator> iterators = getIterators();
+            minHeap = new 
PriorityQueue<MaterializedComparableResultIterator>(iterators.size());
+            for (PeekingResultIterator itr : iterators) {
+                if (itr.peek() == null) {
+                    itr.close();
+                    continue;
+                }
+                minHeap.add(new MaterializedComparableResultIterator(itr, 
itrComparator));
+            }
+        }
+        return minHeap;
+    }
+
+    private class IteratorComparator implements Comparator<Tuple> {
+        public int compare(Tuple c1, Tuple c2) {
+            return MergeSortResultIterator.this.compare(c1, c2);
+        }
+    }
+
     @Override
     public void close() throws SQLException {
         SQLException toThrow = null;
@@ -58,7 +78,7 @@ public abstract class MergeSortResultIterator implements 
PeekingResultIterator {
                 resultIterators.close();
             }
         } catch (Exception e) {
-           toThrow = ServerUtil.parseServerException(e);
+            toThrow = ServerUtil.parseServerException(e);
         } finally {
             try {
                 if (iterators != null) {
@@ -79,36 +99,35 @@ public abstract class MergeSortResultIterator implements 
PeekingResultIterator {
     }
 
     abstract protected int compare(Tuple t1, Tuple t2);
-    
-    private PeekingResultIterator minIterator() throws SQLException {
-        List<PeekingResultIterator> iterators = getIterators();
-        Tuple minResult = null;
-        PeekingResultIterator minIterator = EMPTY_ITERATOR;
-        for (int i = iterators.size()-1; i >= 0; i--) {
-            PeekingResultIterator iterator = iterators.get(i);
-            Tuple r = iterator.peek();
-            if (r != null) {
-                if (minResult == null || compare(r, minResult) < 0) {
-                    minResult = r;
-                    minIterator = iterator;
-                }
-                continue;
-            }
-            iterator.close();
-            iterators.remove(i);
-        }
+
+    private MaterializedComparableResultIterator minIterator() throws 
SQLException {
+        PriorityQueue<MaterializedComparableResultIterator> minHeap = 
getMinHeap();
+        MaterializedComparableResultIterator minIterator = minHeap.peek();
         return minIterator;
     }
-    
+
     @Override
     public Tuple peek() throws SQLException {
-        PeekingResultIterator iterator = minIterator();
+        MaterializedComparableResultIterator iterator = minIterator();
+        if (iterator == null) {
+            return null;
+        }
         return iterator.peek();
     }
 
     @Override
     public Tuple next() throws SQLException {
-        PeekingResultIterator iterator = minIterator();
-        return iterator.next();
+        MaterializedComparableResultIterator iterator = minIterator();
+        if (iterator == null) {
+            return null;
+        }
+        Tuple next = iterator.next();
+        minHeap.poll();
+        if (iterator.peek() != null) {
+            minHeap.add(iterator);
+        } else {
+            iterator.close();
+        }
+        return next;
     }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/2dfede44/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java
index 77e42b0..9b2e8de 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java
@@ -35,31 +35,41 @@ import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.AssertResults;
 import org.junit.Test;
 
-
 public class MergeSortResultIteratorTest {
     private final static byte[] A = Bytes.toBytes("a");
     private final static byte[] B = Bytes.toBytes("b");
 
     @Test
     public void testMergeSort() throws Throwable {
-        Tuple[] results1 = new Tuple[] {
-                new SingleKeyValueTuple(new KeyValue(A, SINGLE_COLUMN_FAMILY, 
SINGLE_COLUMN, Bytes.toBytes(1))),
-            };
-        Tuple[] results2 = new Tuple[] {
-                new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, 
SINGLE_COLUMN, Bytes.toBytes(1)))
-            };
-        Tuple[] results3 = new Tuple[] {
-                new SingleKeyValueTuple(new KeyValue(A, SINGLE_COLUMN_FAMILY, 
SINGLE_COLUMN, Bytes.toBytes(1))),
-                new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, 
SINGLE_COLUMN, Bytes.toBytes(1))),
-            };
-        final List<PeekingResultIterator>results = new 
ArrayList<PeekingResultIterator>(Arrays.asList(new PeekingResultIterator[] {new 
MaterializedResultIterator(Arrays.asList(results1)), new 
MaterializedResultIterator(Arrays.asList(results2)), new 
MaterializedResultIterator(Arrays.asList(results3))}));
-
-        Tuple[] expectedResults = new Tuple[] {
-                new SingleKeyValueTuple(new KeyValue(A, SINGLE_COLUMN_FAMILY, 
SINGLE_COLUMN, Bytes.toBytes(1))),
-                new SingleKeyValueTuple(new KeyValue(A, SINGLE_COLUMN_FAMILY, 
SINGLE_COLUMN, Bytes.toBytes(1))),
-                new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, 
SINGLE_COLUMN, Bytes.toBytes(1))),
-                new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, 
SINGLE_COLUMN, Bytes.toBytes(1))),
-            };
+        Tuple[] results1 =
+                new Tuple[] { new SingleKeyValueTuple(
+                        new KeyValue(A, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, 
Bytes.toBytes(1))), };
+        Tuple[] results2 =
+                new Tuple[] { new SingleKeyValueTuple(
+                        new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, 
Bytes.toBytes(1))) };
+        Tuple[] results3 =
+                new Tuple[] {
+                        new SingleKeyValueTuple(new KeyValue(A, 
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN,
+                                Bytes.toBytes(1))),
+                        new SingleKeyValueTuple(new KeyValue(B, 
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN,
+                                Bytes.toBytes(1))), };
+
+        final List<PeekingResultIterator> results =
+                new ArrayList<PeekingResultIterator>(Arrays.asList(new 
PeekingResultIterator[] {
+                        new 
MaterializedResultIterator(Arrays.asList(results1)),
+                        new 
MaterializedResultIterator(Arrays.asList(results2)),
+                        new 
MaterializedResultIterator(Arrays.asList(results3)) }));
+
+        Tuple[] expectedResults =
+                new Tuple[] {
+                        new SingleKeyValueTuple(new KeyValue(A, 
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN,
+                                Bytes.toBytes(1))),
+                        new SingleKeyValueTuple(new KeyValue(A, 
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN,
+                                Bytes.toBytes(1))),
+                        new SingleKeyValueTuple(new KeyValue(B, 
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN,
+                                Bytes.toBytes(1))),
+                        new SingleKeyValueTuple(new KeyValue(B, 
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN,
+                                Bytes.toBytes(1))), };
 
         ResultIterators iterators = new ResultIterators() {
 
@@ -76,16 +86,46 @@ public class MergeSortResultIteratorTest {
             @Override
             public void explain(List<String> planSteps) {
             }
-            
-                       @Override
-                       public List<KeyRange> getSplits() {
-                               return Collections.emptyList();
-                       }
 
-                       @Override
-                       public List<List<Scan>> getScans() {
-                               return Collections.emptyList();
-                       }
+            @Override
+            public List<KeyRange> getSplits() {
+                return Collections.emptyList();
+            }
+
+            @Override
+            public List<List<Scan>> getScans() {
+                return Collections.emptyList();
+            }
+
+            @Override
+            public void close() throws SQLException {
+            }
+        };
+        ResultIterators reverseIterators = new ResultIterators() {
+
+            @Override
+            public List<PeekingResultIterator> getIterators() throws 
SQLException {
+                return results;
+            }
+
+            @Override
+            public int size() {
+                return results.size();
+            }
+
+            @Override
+            public void explain(List<String> planSteps) {
+            }
+
+            @Override
+            public List<KeyRange> getSplits() {
+                return Collections.emptyList();
+            }
+
+            @Override
+            public List<List<Scan>> getScans() {
+                return Collections.emptyList();
+            }
 
             @Override
             public void close() throws SQLException {
@@ -95,4 +135,67 @@ public class MergeSortResultIteratorTest {
         AssertResults.assertResults(scanner, expectedResults);
     }
 
+    @Test
+    public void testReverseMergeSort() throws Throwable {
+        Tuple[] results1 =
+                new Tuple[] { new SingleKeyValueTuple(
+                        new KeyValue(A, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, 
Bytes.toBytes(1))), };
+        Tuple[] results2 =
+                new Tuple[] { new SingleKeyValueTuple(
+                        new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, 
Bytes.toBytes(1))) };
+        Tuple[] results3 =
+                new Tuple[] {
+                        new SingleKeyValueTuple(new KeyValue(B, 
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN,
+                                Bytes.toBytes(1))),
+                        new SingleKeyValueTuple(new KeyValue(A, 
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN,
+                                Bytes.toBytes(1))), };
+        final List<PeekingResultIterator> results =
+                new ArrayList<PeekingResultIterator>(Arrays.asList(new 
PeekingResultIterator[] {
+                        new 
MaterializedResultIterator(Arrays.asList(results1)),
+                        new 
MaterializedResultIterator(Arrays.asList(results2)),
+                        new 
MaterializedResultIterator(Arrays.asList(results3)) }));
+        Tuple[] expectedResults =
+                new Tuple[] {
+                        new SingleKeyValueTuple(new KeyValue(B, 
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN,
+                                Bytes.toBytes(1))),
+                        new SingleKeyValueTuple(new KeyValue(B, 
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN,
+                                Bytes.toBytes(1))),
+                        new SingleKeyValueTuple(new KeyValue(A, 
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN,
+                                Bytes.toBytes(1))),
+                        new SingleKeyValueTuple(new KeyValue(A, 
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN,
+                                Bytes.toBytes(1))), };
+        ResultIterators iterators = new ResultIterators() {
+
+            @Override
+            public List<PeekingResultIterator> getIterators() throws 
SQLException {
+                return results;
+            }
+
+            @Override
+            public int size() {
+                return results.size();
+            }
+
+            @Override
+            public void explain(List<String> planSteps) {
+            }
+
+            @Override
+            public List<KeyRange> getSplits() {
+                return Collections.emptyList();
+            }
+
+            @Override
+            public List<List<Scan>> getScans() {
+                return Collections.emptyList();
+            }
+
+            @Override
+            public void close() throws SQLException {
+            }
+        };
+        ResultIterator scanner = new MergeSortRowKeyResultIterator(iterators, 
0, true);
+        AssertResults.assertResults(scanner, expectedResults);
+    }
+
 }

Reply via email to