Use a priority queue in MergeSortResultIterator (Ankit Singhal)
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/2dfede44 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/2dfede44 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/2dfede44 Branch: refs/heads/txn Commit: 2dfede4488a3086f5b78cae48a5367f486d8a49c Parents: 6478090 Author: Samarth <[email protected]> Authored: Thu Nov 12 15:03:04 2015 -0800 Committer: Samarth <[email protected]> Committed: Thu Nov 12 15:03:04 2015 -0800 ---------------------------------------------------------------------- .../MaterializedComparableResultIterator.java | 72 +++++++++ .../iterate/MergeSortResultIterator.java | 83 ++++++---- .../iterate/MergeSortResultIteratorTest.java | 159 +++++++++++++++---- 3 files changed, 254 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/2dfede44/phoenix-core/src/main/java/org/apache/phoenix/iterate/MaterializedComparableResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MaterializedComparableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MaterializedComparableResultIterator.java new file mode 100644 index 0000000..093a098 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MaterializedComparableResultIterator.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.iterate; + +import java.sql.SQLException; +import java.util.Comparator; +import java.util.List; + +import org.apache.phoenix.schema.tuple.Tuple; + +/** + * Fully materialized comparable result iterator backed by the result iterator provided with + * comparator. No copy is made of the backing results iterator. + */ +public class MaterializedComparableResultIterator + implements PeekingResultIterator, Comparable<MaterializedComparableResultIterator> { + + private PeekingResultIterator delegate; + private Comparator<? super Tuple> comparator; + private Tuple current; + + public Tuple getCurrent() { + return current; + } + + public MaterializedComparableResultIterator(PeekingResultIterator delegate, + Comparator<? super Tuple> c) throws SQLException { + this.delegate = delegate; + this.comparator = c; + this.current = delegate.peek(); + } + + public Tuple next() throws SQLException { + Tuple next = delegate.next(); + this.current = delegate.peek(); + return next; + } + + public Tuple peek() throws SQLException { + return delegate.peek(); + } + + public void close() throws SQLException { + delegate.close(); + } + + @Override + public int compareTo(MaterializedComparableResultIterator o) { + return comparator.compare(this.getCurrent(), o.getCurrent()); + + } + + @Override + public void explain(List<String> planSteps) { + delegate.explain(planSteps); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/2dfede44/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java index 9ef3e70..75843eb 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/MergeSortResultIterator.java @@ -18,38 +18,58 @@ package org.apache.phoenix.iterate; import java.sql.SQLException; +import java.util.Comparator; import java.util.List; +import java.util.PriorityQueue; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.SQLCloseables; import org.apache.phoenix.util.ServerUtil; - /** - * - * Base class for a ResultIterator that does a merge sort on the list of iterators - * provided. - * - * + * Base class for a ResultIterator that does a merge sort on the list of iterators provided. * @since 1.2 */ public abstract class MergeSortResultIterator implements PeekingResultIterator { protected final ResultIterators resultIterators; protected final ImmutableBytesWritable tempPtr = new ImmutableBytesWritable(); private List<PeekingResultIterator> iterators; - + private PriorityQueue<MaterializedComparableResultIterator> minHeap; + private final IteratorComparator itrComparator = new IteratorComparator(); + public MergeSortResultIterator(ResultIterators iterators) { this.resultIterators = iterators; } - + private List<PeekingResultIterator> getIterators() throws SQLException { if (iterators == null) { iterators = resultIterators.getIterators(); } return iterators; } - + + private PriorityQueue<MaterializedComparableResultIterator> getMinHeap() throws SQLException { + if (minHeap == null) { + List<PeekingResultIterator> iterators = getIterators(); + minHeap = new PriorityQueue<MaterializedComparableResultIterator>(iterators.size()); + for (PeekingResultIterator itr : iterators) { + if (itr.peek() == null) { + itr.close(); + continue; + } + minHeap.add(new MaterializedComparableResultIterator(itr, itrComparator)); + } + } + return minHeap; + } + + private class IteratorComparator implements Comparator<Tuple> { + public int compare(Tuple c1, Tuple c2) { + return MergeSortResultIterator.this.compare(c1, c2); + } + } + @Override public void close() throws SQLException { SQLException toThrow = null; @@ -58,7 +78,7 @@ public abstract class MergeSortResultIterator implements PeekingResultIterator { resultIterators.close(); } } catch (Exception e) { - toThrow = ServerUtil.parseServerException(e); + toThrow = ServerUtil.parseServerException(e); } finally { try { if (iterators != null) { @@ -79,36 +99,35 @@ public abstract class MergeSortResultIterator implements PeekingResultIterator { } abstract protected int compare(Tuple t1, Tuple t2); - - private PeekingResultIterator minIterator() throws SQLException { - List<PeekingResultIterator> iterators = getIterators(); - Tuple minResult = null; - PeekingResultIterator minIterator = EMPTY_ITERATOR; - for (int i = iterators.size()-1; i >= 0; i--) { - PeekingResultIterator iterator = iterators.get(i); - Tuple r = iterator.peek(); - if (r != null) { - if (minResult == null || compare(r, minResult) < 0) { - minResult = r; - minIterator = iterator; - } - continue; - } - iterator.close(); - iterators.remove(i); - } + + private MaterializedComparableResultIterator minIterator() throws SQLException { + PriorityQueue<MaterializedComparableResultIterator> minHeap = getMinHeap(); + MaterializedComparableResultIterator minIterator = minHeap.peek(); return minIterator; } - + @Override public Tuple peek() throws SQLException { - PeekingResultIterator iterator = minIterator(); + MaterializedComparableResultIterator iterator = minIterator(); + if (iterator == null) { + return null; + } return iterator.peek(); } @Override public Tuple next() throws SQLException { - PeekingResultIterator iterator = minIterator(); - return iterator.next(); + MaterializedComparableResultIterator iterator = minIterator(); + if (iterator == null) { + return null; + } + Tuple next = iterator.next(); + minHeap.poll(); + if (iterator.peek() != null) { + minHeap.add(iterator); + } else { + iterator.close(); + } + return next; } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/2dfede44/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java index 77e42b0..9b2e8de 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/iterate/MergeSortResultIteratorTest.java @@ -35,31 +35,41 @@ import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.AssertResults; import org.junit.Test; - public class MergeSortResultIteratorTest { private final static byte[] A = Bytes.toBytes("a"); private final static byte[] B = Bytes.toBytes("b"); @Test public void testMergeSort() throws Throwable { - Tuple[] results1 = new Tuple[] { - new SingleKeyValueTuple(new KeyValue(A, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(1))), - }; - Tuple[] results2 = new Tuple[] { - new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(1))) - }; - Tuple[] results3 = new Tuple[] { - new SingleKeyValueTuple(new KeyValue(A, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(1))), - new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(1))), - }; - final List<PeekingResultIterator>results = new ArrayList<PeekingResultIterator>(Arrays.asList(new PeekingResultIterator[] {new MaterializedResultIterator(Arrays.asList(results1)), new MaterializedResultIterator(Arrays.asList(results2)), new MaterializedResultIterator(Arrays.asList(results3))})); - - Tuple[] expectedResults = new Tuple[] { - new SingleKeyValueTuple(new KeyValue(A, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(1))), - new SingleKeyValueTuple(new KeyValue(A, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(1))), - new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(1))), - new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(1))), - }; + Tuple[] results1 = + new Tuple[] { new SingleKeyValueTuple( + new KeyValue(A, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(1))), }; + Tuple[] results2 = + new Tuple[] { new SingleKeyValueTuple( + new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(1))) }; + Tuple[] results3 = + new Tuple[] { + new SingleKeyValueTuple(new KeyValue(A, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, + Bytes.toBytes(1))), + new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, + Bytes.toBytes(1))), }; + + final List<PeekingResultIterator> results = + new ArrayList<PeekingResultIterator>(Arrays.asList(new PeekingResultIterator[] { + new MaterializedResultIterator(Arrays.asList(results1)), + new MaterializedResultIterator(Arrays.asList(results2)), + new MaterializedResultIterator(Arrays.asList(results3)) })); + + Tuple[] expectedResults = + new Tuple[] { + new SingleKeyValueTuple(new KeyValue(A, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, + Bytes.toBytes(1))), + new SingleKeyValueTuple(new KeyValue(A, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, + Bytes.toBytes(1))), + new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, + Bytes.toBytes(1))), + new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, + Bytes.toBytes(1))), }; ResultIterators iterators = new ResultIterators() { @@ -76,16 +86,46 @@ public class MergeSortResultIteratorTest { @Override public void explain(List<String> planSteps) { } - - @Override - public List<KeyRange> getSplits() { - return Collections.emptyList(); - } - @Override - public List<List<Scan>> getScans() { - return Collections.emptyList(); - } + @Override + public List<KeyRange> getSplits() { + return Collections.emptyList(); + } + + @Override + public List<List<Scan>> getScans() { + return Collections.emptyList(); + } + + @Override + public void close() throws SQLException { + } + }; + ResultIterators reverseIterators = new ResultIterators() { + + @Override + public List<PeekingResultIterator> getIterators() throws SQLException { + return results; + } + + @Override + public int size() { + return results.size(); + } + + @Override + public void explain(List<String> planSteps) { + } + + @Override + public List<KeyRange> getSplits() { + return Collections.emptyList(); + } + + @Override + public List<List<Scan>> getScans() { + return Collections.emptyList(); + } @Override public void close() throws SQLException { @@ -95,4 +135,67 @@ public class MergeSortResultIteratorTest { AssertResults.assertResults(scanner, expectedResults); } + @Test + public void testReverseMergeSort() throws Throwable { + Tuple[] results1 = + new Tuple[] { new SingleKeyValueTuple( + new KeyValue(A, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(1))), }; + Tuple[] results2 = + new Tuple[] { new SingleKeyValueTuple( + new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, Bytes.toBytes(1))) }; + Tuple[] results3 = + new Tuple[] { + new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, + Bytes.toBytes(1))), + new SingleKeyValueTuple(new KeyValue(A, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, + Bytes.toBytes(1))), }; + final List<PeekingResultIterator> results = + new ArrayList<PeekingResultIterator>(Arrays.asList(new PeekingResultIterator[] { + new MaterializedResultIterator(Arrays.asList(results1)), + new MaterializedResultIterator(Arrays.asList(results2)), + new MaterializedResultIterator(Arrays.asList(results3)) })); + Tuple[] expectedResults = + new Tuple[] { + new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, + Bytes.toBytes(1))), + new SingleKeyValueTuple(new KeyValue(B, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, + Bytes.toBytes(1))), + new SingleKeyValueTuple(new KeyValue(A, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, + Bytes.toBytes(1))), + new SingleKeyValueTuple(new KeyValue(A, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, + Bytes.toBytes(1))), }; + ResultIterators iterators = new ResultIterators() { + + @Override + public List<PeekingResultIterator> getIterators() throws SQLException { + return results; + } + + @Override + public int size() { + return results.size(); + } + + @Override + public void explain(List<String> planSteps) { + } + + @Override + public List<KeyRange> getSplits() { + return Collections.emptyList(); + } + + @Override + public List<List<Scan>> getScans() { + return Collections.emptyList(); + } + + @Override + public void close() throws SQLException { + } + }; + ResultIterator scanner = new MergeSortRowKeyResultIterator(iterators, 0, true); + AssertResults.assertResults(scanner, expectedResults); + } + }
