This is an automated email from the ASF dual-hosted git repository. larsh pushed a commit to branch 4.x-HBase-1.2 in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.2 by this push: new 9ffe0d9 PHOENIX-5120 Avoid using MappedByteBuffers for server side sorting. 9ffe0d9 is described below commit 9ffe0d9b72ae7c531808ab63e2e9c4a8e8c60ec4 Author: Lars Hofhansl <la...@apache.org> AuthorDate: Mon Feb 4 21:07:26 2019 -0800 PHOENIX-5120 Avoid using MappedByteBuffers for server side sorting. --- .../phoenix/end2end/OrderByWithSpillingIT.java | 36 ++++++ .../apache/phoenix/execute/SortMergeJoinPlan.java | 46 ++++---- ...ppedByteBufferQueue.java => BufferedQueue.java} | 127 ++++++++------------- ...erSortedQueue.java => BufferedSortedQueue.java} | 55 ++++----- .../phoenix/iterate/OrderedResultIterator.java | 2 +- 5 files changed, 136 insertions(+), 130 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithSpillingIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithSpillingIT.java new file mode 100644 index 0000000..c5eeaff --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/OrderByWithSpillingIT.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import java.util.Map; + +import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.util.ReadOnlyProps; +import org.junit.BeforeClass; + +import com.google.common.collect.Maps; + +public class OrderByWithSpillingIT extends OrderByIT { + @BeforeClass + public static void doSetup() throws Exception { + Map<String, String> props = Maps.newHashMapWithExpectedSize(1); + // do lot's of spooling! + props.put(QueryServices.SPOOL_THRESHOLD_BYTES_ATTRIB, Integer.toString(1)); + setUpTestDriver(new ReadOnlyProps(props.entrySet().iterator())); + } +} diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java index 978c7b4..55bba11 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/SortMergeJoinPlan.java @@ -20,8 +20,9 @@ package org.apache.phoenix.execute; import static org.apache.phoenix.util.NumberUtil.add; import static org.apache.phoenix.util.NumberUtil.getMin; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.IOException; -import java.nio.MappedByteBuffer; import java.sql.ParameterMetaData; import java.sql.SQLException; import java.util.Collections; @@ -51,7 +52,7 @@ import org.apache.phoenix.execute.visitor.ByteCountVisitor; import org.apache.phoenix.execute.visitor.QueryPlanVisitor; import org.apache.phoenix.expression.Expression; import org.apache.phoenix.iterate.DefaultParallelScanGrouper; -import org.apache.phoenix.iterate.MappedByteBufferQueue; +import org.apache.phoenix.iterate.BufferedQueue; import org.apache.phoenix.iterate.ParallelScanGrouper; import org.apache.phoenix.iterate.ResultIterator; import org.apache.phoenix.jdbc.PhoenixParameterMetaData; @@ -293,7 +294,7 @@ public class SortMergeJoinPlan implements QueryPlan { private ValueBitSet lhsBitSet; private ValueBitSet rhsBitSet; private byte[] emptyProjectedValue; - private MappedByteBufferTupleQueue queue; + private BufferedTupleQueue queue; private Iterator<Tuple> queueIterator; public BasicJoinIterator(ResultIterator lhsIterator, ResultIterator rhsIterator) { @@ -315,7 +316,7 @@ public class SortMergeJoinPlan implements QueryPlan { int len = lhsBitSet.getEstimatedLength(); this.emptyProjectedValue = new byte[len]; lhsBitSet.toBytes(emptyProjectedValue, 0); - this.queue = new MappedByteBufferTupleQueue(thresholdBytes); + this.queue = new BufferedTupleQueue(thresholdBytes); this.queueIterator = null; } @@ -609,24 +610,24 @@ public class SortMergeJoinPlan implements QueryPlan { } } - private static class MappedByteBufferTupleQueue extends MappedByteBufferQueue<Tuple> { + private static class BufferedTupleQueue extends BufferedQueue<Tuple> { - public MappedByteBufferTupleQueue(int thresholdBytes) { + public BufferedTupleQueue(int thresholdBytes) { super(thresholdBytes); } @Override - protected MappedByteBufferSegmentQueue<Tuple> createSegmentQueue( + protected BufferedSegmentQueue<Tuple> createSegmentQueue( int index, int thresholdBytes) { - return new MappedByteBufferTupleSegmentQueue(index, thresholdBytes, false); + return new BufferedTupleSegmentQueue(index, thresholdBytes, false); } @Override - protected Comparator<MappedByteBufferSegmentQueue<Tuple>> getSegmentQueueComparator() { - return new Comparator<MappedByteBufferSegmentQueue<Tuple>>() { + protected Comparator<BufferedSegmentQueue<Tuple>> getSegmentQueueComparator() { + return new Comparator<BufferedSegmentQueue<Tuple>>() { @Override - public int compare(MappedByteBufferSegmentQueue<Tuple> q1, - MappedByteBufferSegmentQueue<Tuple> q2) { + public int compare(BufferedSegmentQueue<Tuple> q1, + BufferedSegmentQueue<Tuple> q2) { return q1.index() - q2.index(); } }; @@ -635,7 +636,7 @@ public class SortMergeJoinPlan implements QueryPlan { @Override public Iterator<Tuple> iterator() { return new Iterator<Tuple>() { - private Iterator<MappedByteBufferSegmentQueue<Tuple>> queueIter; + private Iterator<BufferedSegmentQueue<Tuple>> queueIter; private Iterator<Tuple> currentIter; { this.queueIter = getSegmentQueues().iterator(); @@ -668,10 +669,10 @@ public class SortMergeJoinPlan implements QueryPlan { }; } - private static class MappedByteBufferTupleSegmentQueue extends MappedByteBufferSegmentQueue<Tuple> { + private static class BufferedTupleSegmentQueue extends BufferedSegmentQueue<Tuple> { private LinkedList<Tuple> results; - public MappedByteBufferTupleSegmentQueue(int index, + public BufferedTupleSegmentQueue(int index, int thresholdBytes, boolean hasMaxQueueSize) { super(index, thresholdBytes, hasMaxQueueSize); this.results = Lists.newLinkedList(); @@ -688,23 +689,22 @@ public class SortMergeJoinPlan implements QueryPlan { return Bytes.SIZEOF_INT * 2 + kv.getLength(); } - @SuppressWarnings("deprecation") @Override - protected void writeToBuffer(MappedByteBuffer buffer, Tuple e) { + protected void writeToStream(DataOutputStream out, Tuple e) throws IOException { KeyValue kv = KeyValueUtil.ensureKeyValue(e.getValue(0)); - buffer.putInt(kv.getLength() + Bytes.SIZEOF_INT); - buffer.putInt(kv.getLength()); - buffer.put(kv.getBuffer(), kv.getOffset(), kv.getLength()); + out.writeInt(kv.getLength() + Bytes.SIZEOF_INT); + out.writeInt(kv.getLength()); + out.write(kv.getBuffer(), kv.getOffset(), kv.getLength()); } @Override - protected Tuple readFromBuffer(MappedByteBuffer buffer) { - int length = buffer.getInt(); + protected Tuple readFromStream(DataInputStream in) throws IOException { + int length = in.readInt(); if (length < 0) return null; byte[] b = new byte[length]; - buffer.get(b); + in.read(b); Result result = ResultUtil.toResult(new ImmutableBytesWritable(b)); return new ResultTuple(result); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferedQueue.java similarity index 70% rename from phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java rename to phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferedQueue.java index 135ab26..6f6c523 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferQueue.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferedQueue.java @@ -17,13 +17,15 @@ */ package org.apache.phoenix.iterate; +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; import java.io.Closeable; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.MappedByteBuffer; -import java.nio.channels.FileChannel; -import java.nio.channels.FileChannel.MapMode; import java.util.AbstractQueue; import java.util.Comparator; import java.util.Iterator; @@ -34,26 +36,26 @@ import java.util.UUID; import com.google.common.collect.Lists; import com.google.common.collect.MinMaxPriorityQueue; -public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> { +public abstract class BufferedQueue<T> extends AbstractQueue<T> { private final int thresholdBytes; - private List<MappedByteBufferSegmentQueue<T>> queues; + private List<BufferedSegmentQueue<T>> queues; private int currentIndex; - private MappedByteBufferSegmentQueue<T> currentQueue; - private MinMaxPriorityQueue<MappedByteBufferSegmentQueue<T>> mergedQueue; + private BufferedSegmentQueue<T> currentQueue; + private MinMaxPriorityQueue<BufferedSegmentQueue<T>> mergedQueue; - public MappedByteBufferQueue(int thresholdBytes) { + public BufferedQueue(int thresholdBytes) { this.thresholdBytes = thresholdBytes; - this.queues = Lists.<MappedByteBufferSegmentQueue<T>> newArrayList(); + this.queues = Lists.<BufferedSegmentQueue<T>> newArrayList(); this.currentIndex = -1; this.currentQueue = null; this.mergedQueue = null; } - abstract protected MappedByteBufferSegmentQueue<T> createSegmentQueue(int index, int thresholdBytes); + abstract protected BufferedSegmentQueue<T> createSegmentQueue(int index, int thresholdBytes); - abstract protected Comparator<MappedByteBufferSegmentQueue<T>> getSegmentQueueComparator(); + abstract protected Comparator<BufferedSegmentQueue<T>> getSegmentQueueComparator(); - protected final List<MappedByteBufferSegmentQueue<T>> getSegmentQueues() { + protected final List<BufferedSegmentQueue<T>> getSegmentQueues() { return queues.subList(0, currentIndex + 1); } @@ -77,7 +79,7 @@ public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> { public T poll() { initMergedQueue(); if (mergedQueue != null && !mergedQueue.isEmpty()) { - MappedByteBufferSegmentQueue<T> queue = mergedQueue.poll(); + BufferedSegmentQueue<T> queue = mergedQueue.poll(); T re = queue.poll(); if (queue.peek() != null) { mergedQueue.add(queue); @@ -98,7 +100,7 @@ public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> { @Override public void clear() { - for (MappedByteBufferSegmentQueue<T> queue : getSegmentQueues()) { + for (BufferedSegmentQueue<T> queue : getSegmentQueues()) { queue.clear(); } currentIndex = -1; @@ -114,7 +116,7 @@ public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> { @Override public int size() { int size = 0; - for (MappedByteBufferSegmentQueue<T> queue : getSegmentQueues()) { + for (BufferedSegmentQueue<T> queue : getSegmentQueues()) { size += queue.size(); } return size; @@ -125,7 +127,7 @@ public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> { } public void close() { - for (MappedByteBufferSegmentQueue<T> queue : queues) { + for (BufferedSegmentQueue<T> queue : queues) { queue.close(); } queues.clear(); @@ -133,9 +135,9 @@ public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> { private void initMergedQueue() { if (mergedQueue == null && currentIndex >= 0) { - mergedQueue = MinMaxPriorityQueue.<MappedByteBufferSegmentQueue<T>> orderedBy( + mergedQueue = MinMaxPriorityQueue.<BufferedSegmentQueue<T>> orderedBy( getSegmentQueueComparator()).maximumSize(currentIndex + 1).create(); - for (MappedByteBufferSegmentQueue<T> queue : getSegmentQueues()) { + for (BufferedSegmentQueue<T> queue : getSegmentQueues()) { T re = queue.peek(); if (re != null) { mergedQueue.add(queue); @@ -144,17 +146,14 @@ public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> { } } - public abstract static class MappedByteBufferSegmentQueue<T> extends AbstractQueue<T> { + public abstract static class BufferedSegmentQueue<T> extends AbstractQueue<T> { protected static final int EOF = -1; - // at least create 128 KB MappedByteBuffers - private static final long DEFAULT_MAPPING_SIZE = 128 * 1024; private final int index; private final int thresholdBytes; private final boolean hasMaxQueueSize; private long totalResultSize = 0; private int maxResultSize = 0; - private long mappingSize = 0; private File file; private boolean isClosed = false; private boolean flushBuffer = false; @@ -164,7 +163,7 @@ public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> { // iterators to close on close() private List<SegmentQueueFileIterator> iterators; - public MappedByteBufferSegmentQueue(int index, int thresholdBytes, boolean hasMaxQueueSize) { + public BufferedSegmentQueue(int index, int thresholdBytes, boolean hasMaxQueueSize) { this.index = index; this.thresholdBytes = thresholdBytes; this.hasMaxQueueSize = hasMaxQueueSize; @@ -173,8 +172,8 @@ public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> { abstract protected Queue<T> getInMemoryQueue(); abstract protected int sizeOf(T e); - abstract protected void writeToBuffer(MappedByteBuffer buffer, T e); - abstract protected T readFromBuffer(MappedByteBuffer buffer); + abstract protected void writeToStream(DataOutputStream out, T e) throws IOException; + abstract protected T readFromStream(DataInputStream in) throws IOException; public int index() { return this.index; @@ -253,7 +252,6 @@ public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> { getInMemoryQueue().clear(); this.totalResultSize = 0; this.maxResultSize = 0; - this.mappingSize = 0; this.flushBuffer = false; this.flushedCount = 0; this.current = null; @@ -303,38 +301,25 @@ public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> { totalResultSize = hasMaxQueueSize ? maxResultSize * inMemQueue.size() : (totalResultSize + resultSize); if (totalResultSize >= thresholdBytes) { this.file = File.createTempFile(UUID.randomUUID().toString(), null); - RandomAccessFile af = new RandomAccessFile(file, "rw"); - FileChannel fc = af.getChannel(); - int writeIndex = 0; - mappingSize = Math.min(Math.max(maxResultSize, DEFAULT_MAPPING_SIZE), totalResultSize); - MappedByteBuffer writeBuffer = fc.map(MapMode.READ_WRITE, writeIndex, mappingSize); - - int resSize = inMemQueue.size(); - for (int i = 0; i < resSize; i++) { - T e = inMemQueue.poll(); - writeToBuffer(writeBuffer, e); - // buffer close to exhausted, re-map. - if (mappingSize - writeBuffer.position() < maxResultSize) { - writeIndex += writeBuffer.position(); - writeBuffer = fc.map(MapMode.READ_WRITE, writeIndex, mappingSize); + try (DataOutputStream out = new DataOutputStream( + new BufferedOutputStream(new FileOutputStream(file)))) { + int resSize = inMemQueue.size(); + for (int i = 0; i < resSize; i++) { + T e = inMemQueue.poll(); + writeToStream(out, e); } + out.writeInt(EOF); // end + flushedCount = resSize; + inMemQueue.clear(); + flushBuffer = true; } - writeBuffer.putInt(EOF); // end - fc.force(true); - fc.close(); - af.close(); - flushedCount = resSize; - inMemQueue.clear(); - flushBuffer = true; } } private class SegmentQueueFileIterator implements Iterator<T>, Closeable { private boolean isEnd; private long readIndex; - private RandomAccessFile af; - private FileChannel fc; - private MappedByteBuffer readBuffer; + private DataInputStream in; private T next; public SegmentQueueFileIterator() { @@ -354,9 +339,8 @@ public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> { this.readIndex = readIndex; this.next = null; try { - this.af = new RandomAccessFile(file, "r"); - this.fc = af.getChannel(); - this.readBuffer = fc.map(MapMode.READ_ONLY, readIndex, mappingSize); + this.in = new DataInputStream( + new BufferedInputStream(new FileInputStream(file))); } catch (IOException e) { throw new RuntimeException(e); } @@ -384,23 +368,17 @@ public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> { private T readNext() { if (isEnd) return null; - - T e = readFromBuffer(readBuffer); + + T e = null; + try { + e = readFromStream(in); + } catch (IOException ex) { + throw new RuntimeException(ex); + } if (e == null) { close(); return null; } - - // buffer close to exhausted, re-map. - if (mappingSize - readBuffer.position() < maxResultSize) { - readIndex += readBuffer.position(); - try { - readBuffer = fc.map(MapMode.READ_ONLY, readIndex, mappingSize); - } catch (IOException ex) { - throw new RuntimeException(ex); - } - } - return e; } @@ -412,18 +390,9 @@ public abstract class MappedByteBufferQueue<T> extends AbstractQueue<T> { @Override public void close() { this.isEnd = true; - if (this.fc != null) { - try { - this.fc.close(); - } catch (IOException ignored) { - } - } - if (this.af != null) { - try { - this.af.close(); - } catch (IOException ignored) { - } - this.af = null; + try { + this.in.close(); + } catch (IOException ignored) { } } } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferSortedQueue.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferedSortedQueue.java similarity index 74% rename from phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferSortedQueue.java rename to phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferedSortedQueue.java index ae2f452..36e23dd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/MappedByteBufferSortedQueue.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/BufferedSortedQueue.java @@ -17,8 +17,9 @@ */ package org.apache.phoenix.iterate; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.IOException; -import java.nio.MappedByteBuffer; import java.util.ArrayList; import java.util.Comparator; import java.util.List; @@ -34,11 +35,11 @@ import org.apache.phoenix.util.ResultUtil; import com.google.common.collect.MinMaxPriorityQueue; -public class MappedByteBufferSortedQueue extends MappedByteBufferQueue<ResultEntry> { +public class BufferedSortedQueue extends BufferedQueue<ResultEntry> { private Comparator<ResultEntry> comparator; private final int limit; - public MappedByteBufferSortedQueue(Comparator<ResultEntry> comparator, + public BufferedSortedQueue(Comparator<ResultEntry> comparator, Integer limit, int thresholdBytes) throws IOException { super(thresholdBytes); this.comparator = comparator; @@ -46,25 +47,25 @@ public class MappedByteBufferSortedQueue extends MappedByteBufferQueue<ResultEnt } @Override - protected org.apache.phoenix.iterate.MappedByteBufferQueue.MappedByteBufferSegmentQueue<ResultEntry> createSegmentQueue( + protected BufferedSegmentQueue<ResultEntry> createSegmentQueue( int index, int thresholdBytes) { - return new MappedByteBufferResultEntryPriorityQueue(index, thresholdBytes, limit, comparator); + return new BufferedResultEntryPriorityQueue(index, thresholdBytes, limit, comparator); } @Override - protected Comparator<org.apache.phoenix.iterate.MappedByteBufferQueue.MappedByteBufferSegmentQueue<ResultEntry>> getSegmentQueueComparator() { - return new Comparator<MappedByteBufferSegmentQueue<ResultEntry>>() { + protected Comparator<BufferedSegmentQueue<ResultEntry>> getSegmentQueueComparator() { + return new Comparator<BufferedSegmentQueue<ResultEntry>>() { @Override - public int compare(MappedByteBufferSegmentQueue<ResultEntry> q1, - MappedByteBufferSegmentQueue<ResultEntry> q2) { + public int compare(BufferedSegmentQueue<ResultEntry> q1, + BufferedSegmentQueue<ResultEntry> q2) { return comparator.compare(q1.peek(), q2.peek()); }}; } - private static class MappedByteBufferResultEntryPriorityQueue extends MappedByteBufferSegmentQueue<ResultEntry> { + private static class BufferedResultEntryPriorityQueue extends BufferedSegmentQueue<ResultEntry> { private MinMaxPriorityQueue<ResultEntry> results = null; - public MappedByteBufferResultEntryPriorityQueue(int index, + public BufferedResultEntryPriorityQueue(int index, int thresholdBytes, int limit, Comparator<ResultEntry> comparator) { super(index, thresholdBytes, limit >= 0); this.results = limit < 0 ? @@ -84,54 +85,54 @@ public class MappedByteBufferSortedQueue extends MappedByteBufferQueue<ResultEnt @SuppressWarnings("deprecation") @Override - protected void writeToBuffer(MappedByteBuffer buffer, ResultEntry e) { + protected void writeToStream(DataOutputStream os, ResultEntry e) throws IOException { int totalLen = 0; List<KeyValue> keyValues = toKeyValues(e); for (KeyValue kv : keyValues) { totalLen += (kv.getLength() + Bytes.SIZEOF_INT); } - buffer.putInt(totalLen); + os.writeInt(totalLen); for (KeyValue kv : keyValues) { - buffer.putInt(kv.getLength()); - buffer.put(kv.getBuffer(), kv.getOffset(), kv + os.writeInt(kv.getLength()); + os.write(kv.getBuffer(), kv.getOffset(), kv .getLength()); } ImmutableBytesWritable[] sortKeys = e.sortKeys; - buffer.putInt(sortKeys.length); + os.writeInt(sortKeys.length); for (ImmutableBytesWritable sortKey : sortKeys) { if (sortKey != null) { - buffer.putInt(sortKey.getLength()); - buffer.put(sortKey.get(), sortKey.getOffset(), + os.writeInt(sortKey.getLength()); + os.write(sortKey.get(), sortKey.getOffset(), sortKey.getLength()); } else { - buffer.putInt(0); + os.writeInt(0); } } } @Override - protected ResultEntry readFromBuffer(MappedByteBuffer buffer) { - int length = buffer.getInt(); + protected ResultEntry readFromStream(DataInputStream is) throws IOException { + int length = is.readInt(); if (length < 0) return null; - + byte[] rb = new byte[length]; - buffer.get(rb); + is.read(rb); Result result = ResultUtil.toResult(new ImmutableBytesWritable(rb)); ResultTuple rt = new ResultTuple(result); - int sortKeySize = buffer.getInt(); + int sortKeySize = is.readInt(); ImmutableBytesWritable[] sortKeys = new ImmutableBytesWritable[sortKeySize]; for (int i = 0; i < sortKeySize; i++) { - int contentLength = buffer.getInt(); + int contentLength = is.readInt(); if (contentLength > 0) { byte[] sortKeyContent = new byte[contentLength]; - buffer.get(sortKeyContent); + is.read(sortKeyContent); sortKeys[i] = new ImmutableBytesWritable(sortKeyContent); } else { sortKeys[i] = null; } } - + return new ResultEntry(sortKeys, rt); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java index 36b274a..22712ff 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/OrderedResultIterator.java @@ -208,7 +208,7 @@ public class OrderedResultIterator implements PeekingResultIterator { List<Expression> expressions = Lists.newArrayList(Collections2.transform(orderByExpressions, TO_EXPRESSION)); final Comparator<ResultEntry> comparator = buildComparator(orderByExpressions); try{ - final MappedByteBufferSortedQueue queueEntries = new MappedByteBufferSortedQueue(comparator, limit, + final BufferedSortedQueue queueEntries = new BufferedSortedQueue(comparator, limit, thresholdBytes); resultIterator = new PeekingResultIterator() { int count = 0;