Author: sershe
Date: Sat Feb 21 08:01:06 2015
New Revision: 1661297
URL: http://svn.apache.org/r1661297
Log:
Change the way DiskRange-s are managed, and fix decref for cache, some more
bugfixes
Added:
hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRangeList.java
Modified:
hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java
hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java
Added:
hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRangeList.java
URL:
http://svn.apache.org/viewvc/hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRangeList.java?rev=1661297&view=auto
==============================================================================
---
hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRangeList.java
(added)
+++
hive/branches/llap/common/src/java/org/apache/hadoop/hive/common/DiskRangeList.java
Sat Feb 21 08:01:06 2015
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.common;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/** Java linked list iterator interface is convoluted, and moreover concurrent
modifications
+ * of the same list by multiple iterators are impossible. Hence, this.
+ * Java also doesn't support multiple inheritance, so this cannot be done as
"aspect"... */
+public class DiskRangeList extends DiskRange {
+ private static final Log LOG = LogFactory.getLog(DiskRangeList.class);
+ public DiskRangeList prev, next;
+
+ public DiskRangeList(long offset, long end) {
+ super(offset, end);
+ }
+
+ /** Replaces this element with another in the list; returns the new element.
*/
+ public DiskRangeList replaceSelfWith(DiskRangeList other) {
+ other.prev = this.prev;
+ other.next = this.next;
+ if (this.prev != null) {
+ this.prev.next = other;
+ }
+ if (this.next != null) {
+ this.next.prev = other;
+ }
+ this.next = this.prev = null;
+ return other;
+ }
+
+ /** Inserts an element before current in the list; returns the new element.
*/
+ public DiskRangeList insertBefore(DiskRangeList other) {
+ other.prev = this.prev;
+ other.next = this;
+ if (this.prev != null) {
+ this.prev.next = other;
+ }
+ this.prev = other;
+ return other;
+ }
+
+ /** Inserts an element after current in the list; returns the new element. */
+ public DiskRangeList insertAfter(DiskRangeList other) {
+ other.next = this.next;
+ other.prev = this;
+ if (this.next != null) {
+ this.next.prev = other;
+ }
+ this.next = other;
+ return other;
+ }
+
+ /** Removes an element after current from the list. */
+ public void removeAfter() {
+ DiskRangeList other = this.next;
+ this.next = other.next;
+ if (this.next != null) {
+ this.next.prev = this;
+ }
+ other.next = other.prev = null;
+ }
+
+ /** Removes the current element from the list. */
+ public void removeSelf() {
+ if (this.prev != null) {
+ this.prev.next = this.next;
+ }
+ if (this.next != null) {
+ this.next.prev = this.prev;
+ }
+ this.next = this.prev = null;
+ }
+
+ /** Splits current element in the list, using DiskRange::slice */
+ public DiskRangeList split(long cOffset) {
+ insertAfter((DiskRangeList)this.slice(cOffset, end));
+ return replaceSelfWith((DiskRangeList)this.slice(offset, cOffset));
+ }
+
+ @VisibleForTesting
+ public int listSize() {
+ int result = 1;
+ DiskRangeList current = this.next;
+ while (current != null) {
+ ++result;
+ current = current.next;
+ }
+ return result;
+ }
+
+ @VisibleForTesting
+ public DiskRangeList[] listToArray() {
+ DiskRangeList[] result = new DiskRangeList[listSize()];
+ int i = 0;
+ DiskRangeList current = this.next;
+ while (current != null) {
+ result[i] = current;
+ ++i;
+ current = current.next;
+ }
+ return result;
+ }
+
+ public static class DiskRangeListCreateHelper {
+ private DiskRangeList tail = null, head;
+ public DiskRangeListCreateHelper() {
+ }
+
+ public DiskRangeList getTail() {
+ return tail;
+ }
+
+ public void addOrMerge(long offset, long end, boolean doMerge, boolean
doLogNew) {
+ if (doMerge && tail != null && overlap(tail.offset, tail.end, offset,
end)) {
+ tail.offset = Math.min(tail.offset, offset);
+ tail.end = Math.max(tail.end, end);
+ } else {
+ if (doLogNew) {
+ LOG.info("Creating new range; last range (which can include some
previous adds) was "
+ + tail);
+ }
+ DiskRangeList node = new DiskRangeList(offset, end);
+ if (tail == null) {
+ head = tail = node;
+ } else {
+ tail = tail.insertAfter(node);
+ }
+ }
+ }
+
+ private static boolean overlap(long leftA, long rightA, long leftB, long
rightB) {
+ if (leftA <= leftB) {
+ return rightA >= leftB;
+ }
+ return rightB >= leftA;
+ }
+
+ public DiskRangeList get() {
+ return head;
+ }
+
+ public DiskRangeList extract() {
+ DiskRangeList result = head;
+ head = null;
+ return result;
+ }
+ }
+
+ /**
+ * List in-place mutation helper - a bogus first element that is inserted
before list head,
+ * and thus remains constant even if head is replaced with some new range
via in-place list
+ * mutation. extract() can be used to obtain the modified list.
+ */
+ public static class DiskRangeListMutateHelper extends DiskRangeList {
+ public DiskRangeListMutateHelper(DiskRangeList head) {
+ super(-1, -1);
+ assert head != null;
+ assert head.prev == null;
+ this.next = head;
+ head.prev = this;
+ }
+
+ public DiskRangeList get() {
+ return next;
+ }
+
+ public DiskRangeList extract() {
+ DiskRangeList result = this.next;
+ assert result != null;
+ this.next = result.prev = null;
+ return result;
+ }
+ }
+}
\ No newline at end of file
Modified:
hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java
URL:
http://svn.apache.org/viewvc/hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java?rev=1661297&r1=1661296&r2=1661297&view=diff
==============================================================================
---
hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java
(original)
+++
hive/branches/llap/llap-client/src/java/org/apache/hadoop/hive/llap/io/api/cache/LowLevelCache.java
Sat Feb 21 08:01:06 2015
@@ -18,10 +18,10 @@
package org.apache.hadoop.hive.llap.io.api.cache;
-import java.util.LinkedList;
import java.util.List;
import org.apache.hadoop.hive.common.DiskRange;
+import org.apache.hadoop.hive.common.DiskRangeList;
public interface LowLevelCache {
@@ -29,8 +29,9 @@ public interface LowLevelCache {
* Gets file data for particular offsets. Null entries mean no data.
* @param file File name; MUST be interned.
* @param base base offset for the ranges (stripe offset in case of ORC).
+ * @return
*/
- void getFileData(String fileName, LinkedList<DiskRange> ranges, long base);
+ DiskRangeList getFileData(String fileName, DiskRangeList range, long
baseOffset);
/**
* Puts file data into cache.
Modified:
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
URL:
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java?rev=1661297&r1=1661296&r2=1661297&view=diff
==============================================================================
---
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
(original)
+++
hive/branches/llap/llap-server/src/java/org/apache/hadoop/hive/llap/cache/LowLevelCacheImpl.java
Sat Feb 21 08:01:06 2015
@@ -29,6 +29,8 @@ import java.util.concurrent.atomic.Atomi
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.DiskRange;
+import org.apache.hadoop.hive.common.DiskRangeList;
+import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListMutateHelper;
import org.apache.hadoop.hive.llap.DebugUtils;
import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
import org.apache.hadoop.hive.llap.io.api.cache.LowLevelCache;
@@ -71,22 +73,30 @@ public class LowLevelCacheImpl implement
}
@Override
- public void getFileData(String fileName, LinkedList<DiskRange> ranges, long
baseOffset) {
+ public DiskRangeList getFileData(String fileName, DiskRangeList ranges, long
baseOffset) {
+ if (ranges == null) return null;
FileCache subCache = cache.get(fileName);
- if (subCache == null || !subCache.incRef()) return;
+ if (subCache == null || !subCache.incRef()) return ranges;
try {
- ListIterator<DiskRange> dr = ranges.listIterator();
- while (dr.hasNext()) {
- getOverlappingRanges(baseOffset, dr, subCache.cache);
+ DiskRangeList prev = ranges.prev;
+ if (prev == null) {
+ prev = new DiskRangeListMutateHelper(ranges);
}
+ DiskRangeList current = ranges;
+ while (current != null) {
+ // We assume ranges in "ranges" are non-overlapping; thus, we will
save next in advance.
+ DiskRangeList next = current.next;
+ getOverlappingRanges(baseOffset, current, subCache.cache);
+ current = next;
+ }
+ return prev.next;
} finally {
subCache.decRef();
}
}
- private void getOverlappingRanges(long baseOffset, ListIterator<DiskRange>
drIter,
+ private void getOverlappingRanges(long baseOffset, DiskRangeList
currentNotCached,
ConcurrentSkipListMap<Long, LlapCacheableBuffer> cache) {
- DiskRange currentNotCached = drIter.next();
Iterator<Map.Entry<Long, LlapCacheableBuffer>> matches = cache.subMap(
currentNotCached.offset + baseOffset, currentNotCached.end +
baseOffset)
.entrySet().iterator();
@@ -108,46 +118,46 @@ public class LowLevelCacheImpl implement
}
cacheEnd = cacheOffset + buffer.declaredLength;
CacheChunk currentCached = new CacheChunk(buffer, cacheOffset, cacheEnd);
- currentNotCached = addCachedBufferToIter(drIter, currentNotCached,
currentCached);
+ currentNotCached = addCachedBufferToIter(currentNotCached,
currentCached);
// Now that we've added it into correct position, we can adjust it by
base offset.
currentCached.shiftBy(-baseOffset);
}
}
- private DiskRange addCachedBufferToIter(ListIterator<DiskRange> drIter,
- DiskRange currentNotCached, CacheChunk currentCached) {
+ /**
+ * Adds cached buffer to buffer list.
+ * @param currentNotCached Pointer to the list node where we are inserting.
+ * @param currentCached The cached buffer found for this node, to insert.
+ * @return The new currentNotCached pointer, following the cached buffer
insertion.
+ */
+ private DiskRangeList addCachedBufferToIter(
+ DiskRangeList currentNotCached, CacheChunk currentCached) {
// Both currentNotCached and currentCached already include baseOffset.
if (currentNotCached.offset == currentCached.offset) {
if (currentNotCached.end <= currentCached.end) { // we assume it's
always "==" now
// Replace the entire current DiskRange with new cached range.
- drIter.set(currentCached);
- currentNotCached = null;
+ currentNotCached.replaceSelfWith(currentCached);
+ return null;
} else {
// Insert the new cache range before the disk range.
currentNotCached.offset = currentCached.end;
- drIter.previous();
- drIter.add(currentCached);
- DiskRange dr = drIter.next();
- assert dr == currentNotCached;
+ currentNotCached.insertBefore(currentCached);
+ return currentNotCached;
}
} else {
assert currentNotCached.offset < currentCached.offset;
long originalEnd = currentNotCached.end;
currentNotCached.end = currentCached.offset;
- drIter.add(currentCached);
+ currentNotCached.insertAfter(currentCached);
if (originalEnd <= currentCached.end) { // we assume it's always "==" now
- // We have reached the end of the range and truncated the last
non-cached range.
- currentNotCached = null;
+ return null; // No more matches expected...
} else {
// Insert the new disk range after the cache range. TODO: not strictly
necessary yet?
- currentNotCached = new DiskRange(currentCached.end, originalEnd);
- drIter.add(currentNotCached);
- DiskRange dr = drIter.previous();
- assert dr == currentNotCached;
- drIter.next();
+ currentNotCached = new DiskRangeList(currentCached.end, originalEnd);
+ currentCached.insertAfter(currentNotCached);
+ return currentNotCached;
}
}
- return currentNotCached;
}
private boolean lockBuffer(LlapCacheableBuffer buffer) {
Modified:
hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
URL:
http://svn.apache.org/viewvc/hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java?rev=1661297&r1=1661296&r2=1661297&view=diff
==============================================================================
---
hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
(original)
+++
hive/branches/llap/llap-server/src/test/org/apache/hadoop/hive/llap/cache/TestLowLevelCacheImpl.java
Sat Feb 21 08:01:06 2015
@@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hive.llap.cache;
-import java.util.Iterator;
-import java.util.LinkedList;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
@@ -31,6 +29,9 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.DiskRange;
+import org.apache.hadoop.hive.common.DiskRangeList;
+import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListCreateHelper;
+import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListMutateHelper;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.CacheChunk;
@@ -106,8 +107,8 @@ public class TestLowLevelCacheImpl {
}
private void verifyCacheGet(LowLevelCacheImpl cache, String fileName,
Object... stuff) {
- LinkedList<DiskRange> input = new LinkedList<DiskRange>();
- Iterator<DiskRange> iter = null;
+ DiskRangeListCreateHelper list = new DiskRangeListCreateHelper();
+ DiskRangeList iter = null;
int intCount = 0, lastInt = -1;
int resultCount = stuff.length;
for (Object obj : stuff) {
@@ -118,26 +119,24 @@ public class TestLowLevelCacheImpl {
lastInt = (Integer)obj;
intCount = 1;
} else {
- input.add(new DiskRange(lastInt, (Integer)obj));
+ list.addOrMerge(lastInt, (Integer)obj, true, true);
intCount = 0;
}
continue;
} else if (intCount >= 0) {
assertTrue(intCount == 0);
- assertFalse(input.isEmpty());
intCount = -1;
- cache.getFileData(fileName, input, 0);
- assertEquals(resultCount, input.size());
- iter = input.iterator();
+ iter = cache.getFileData(fileName, list.get(), 0);
+ assertEquals(resultCount, iter.listSize());
}
- assertTrue(iter.hasNext());
- DiskRange next = iter.next();
+ assertTrue(iter != null);
if (obj instanceof LlapMemoryBuffer) {
- assertTrue(next instanceof CacheChunk);
- assertSame(obj, ((CacheChunk)next).buffer);
+ assertTrue(iter instanceof CacheChunk);
+ assertSame(obj, ((CacheChunk)iter).buffer);
} else {
- assertTrue(next.equals(obj));
+ assertTrue(iter.equals(obj));
}
+ iter = iter.next;
}
}
@@ -217,25 +216,36 @@ public class TestLowLevelCacheImpl {
String fileName = isFn1 ? fn1 : fn2;
int fileIndex = isFn1 ? 1 : 2;
int count = rdm.nextInt(offsetsToUse);
- LinkedList<DiskRange> input = new LinkedList<DiskRange>();
- int[] offsets = new int[count];
- for (int j = 0; j < count; ++j) {
- int next = rdm.nextInt(offsetsToUse);
- input.add(dr(next, next + 1));
- offsets[j] = next;
- }
if (isGet) {
- cache.getFileData(fileName, input, 0);
+ DiskRangeListCreateHelper list = new DiskRangeListCreateHelper();
+ int[] offsets = new int[count];
+ for (int j = 0; j < count; ++j) {
+ int next = rdm.nextInt(offsetsToUse);
+ list.addOrMerge(next, next + 1, true, false);
+ offsets[j] = next;
+ }
+ DiskRangeList iter = cache.getFileData(fileName, list.get(), 0);
int j = -1;
- for (DiskRange dr : input) {
+ while (iter != null) {
++j;
- if (!(dr instanceof CacheChunk)) continue;
+ if (!(iter instanceof CacheChunk)) {
+ iter = iter.next;
+ continue;
+ }
++gets;
- LlapCacheableBuffer result =
(LlapCacheableBuffer)((CacheChunk)dr).buffer;
+ LlapCacheableBuffer result =
(LlapCacheableBuffer)((CacheChunk)iter).buffer;
assertEquals(makeFakeArenaIndex(fileIndex, offsets[j]),
result.arenaIndex);
cache.releaseBuffer(result);
+ iter = iter.next;
}
} else {
+ DiskRange[] ranges = new DiskRange[count];
+ int[] offsets = new int[count];
+ for (int j = 0; j < count; ++j) {
+ int next = rdm.nextInt(offsetsToUse);
+ ranges[j] = dr(next, next + 1);
+ offsets[j] = next;
+ }
LlapMemoryBuffer[] buffers = new LlapMemoryBuffer[count];
for (int j = 0; j < offsets.length; ++j) {
LlapCacheableBuffer buf = LowLevelCacheImpl.allocateFake();
@@ -243,8 +253,7 @@ public class TestLowLevelCacheImpl {
buf.arenaIndex = makeFakeArenaIndex(fileIndex, offsets[j]);
buffers[j] = buf;
}
- long[] mask = cache.putFileData(
- fileName, input.toArray(new DiskRange[count]), buffers, 0);
+ long[] mask = cache.putFileData(fileName, ranges, buffers, 0);
puts += buffers.length;
long maskVal = 0;
if (mask != null) {
@@ -276,18 +285,15 @@ public class TestLowLevelCacheImpl {
public Integer call() {
boolean isFirstFile = false;
Random rdm = new Random(1234 + Thread.currentThread().getId());
- LinkedList<DiskRange> input = new LinkedList<DiskRange>();
- DiskRange allOffsets = new DiskRange(0, offsetsToUse + 1);
int evictions = 0;
syncThreadStart(cdlIn, cdlOut);
while (rdmsDone.get() < 3) {
- input.clear();
- input.add(allOffsets);
+ DiskRangeList head = new DiskRangeList(0, offsetsToUse + 1);
isFirstFile = !isFirstFile;
String fileName = isFirstFile ? fn1 : fn2;
- cache.getFileData(fileName, input, 0);
- DiskRange[] results = input.toArray(new DiskRange[input.size()]);
- int startIndex = rdm.nextInt(input.size()), index = startIndex;
+ head = cache.getFileData(fileName, head, 0);
+ DiskRange[] results = head.listToArray();
+ int startIndex = rdm.nextInt(results.length), index = startIndex;
LlapCacheableBuffer victim = null;
do {
DiskRange r = results[index];
@@ -371,8 +377,8 @@ public class TestLowLevelCacheImpl {
return fake;
}
- private DiskRange dr(int from, int to) {
- return new DiskRange(from, to);
+ private DiskRangeList dr(int from, int to) {
+ return new DiskRangeList(from, to);
}
private DiskRange[] drs(int... offsets) {
Modified:
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java
URL:
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java?rev=1661297&r1=1661296&r2=1661297&view=diff
==============================================================================
---
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java
(original)
+++
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/EncodedReaderImpl.java
Sat Feb 21 08:01:06 2015
@@ -28,6 +28,9 @@ import org.apache.hadoop.fs.FSDataInputS
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.DiskRange;
+import org.apache.hadoop.hive.common.DiskRangeList;
+import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListCreateHelper;
+import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListMutateHelper;
import org.apache.hadoop.hive.llap.Consumer;
import org.apache.hadoop.hive.llap.DebugUtils;
import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch;
@@ -38,6 +41,7 @@ import org.apache.hadoop.hive.ql.io.orc.
import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndex;
import org.apache.hadoop.hive.ql.io.orc.OrcProto.RowIndexEntry;
import org.apache.hadoop.hive.ql.io.orc.OrcProto.Stream;
+import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.CacheChunk;
import
org.apache.hadoop.hive.ql.io.orc.RecordReaderUtils.ByteBufferAllocatorPool;
import org.apache.hadoop.hive.shims.HadoopShims.ZeroCopyReaderShim;
@@ -129,7 +133,7 @@ public class EncodedReaderImpl implement
public final int streamIndexOffset;
public final OrcProto.Stream.Kind kind;
/** Iterators for the buffers; used to maintain position in per-rg
reading. */
- ListIterator<DiskRange> bufferIter;
+ DiskRangeList bufferIter;
/** Saved stripe-level stream, to reuse for each RG (e.g. dictionaries). */
StreamBuffer stripeLevelStream;
@@ -152,14 +156,12 @@ public class EncodedReaderImpl implement
// We are also not supposed to call setDone, since we are only part of the
operation.
long stripeOffset = stripe.getOffset();
// 1. Figure out what we have to read.
- LinkedList<DiskRange> rangesToRead = new LinkedList<DiskRange>();
long offset = 0; // Stream offset in relation to the stripe.
// 1.1. Figure out which columns have a present stream
boolean[] hasNull =
RecordReaderUtils.findPresentStreamsByColumn(streamList, types);
if (DebugUtils.isTraceOrcEnabled()) {
LOG.info("The following columns have PRESENT streams: " +
DebugUtils.toString(hasNull));
}
- DiskRange lastRange = null;
// We assume stream list is sorted by column and that non-data
// streams do not interleave data streams for the same column.
@@ -168,6 +170,8 @@ public class EncodedReaderImpl implement
ColumnReadContext[] colCtxs = new ColumnReadContext[colRgs.length];
boolean[] includedRgs = null;
boolean isCompressed = (codec != null);
+
+ DiskRangeListCreateHelper listToRead = new DiskRangeListCreateHelper();
for (OrcProto.Stream stream : streamList) {
long length = stream.getLength();
int colIx = stream.getColumn();
@@ -202,71 +206,61 @@ public class EncodedReaderImpl implement
+ ", " + length + ", index position " + indexIx);
}
if (includedRgs == null || RecordReaderUtils.isDictionary(streamKind,
encodings.get(colIx))) {
- lastRange = RecordReaderUtils.addEntireStreamToRanges(
- offset, length, lastRange, rangesToRead);
+ RecordReaderUtils.addEntireStreamToRanges(offset, length, listToRead,
true);
if (DebugUtils.isTraceOrcEnabled()) {
- LOG.info("Will read whole stream " + streamKind + "; added to " +
lastRange);
+ LOG.info("Will read whole stream " + streamKind + "; added to " +
listToRead.getTail());
}
} else {
- lastRange = RecordReaderUtils.addRgFilteredStreamToRanges(stream,
includedRgs,
+ RecordReaderUtils.addRgFilteredStreamToRanges(stream, includedRgs,
codec != null, indexes[colIx], encodings.get(colIx),
types.get(colIx),
- bufferSize, hasNull[colIx], offset, length, lastRange,
rangesToRead);
+ bufferSize, hasNull[colIx], offset, length, listToRead, true);
}
offset += length;
}
// 2. Now, read all of the ranges from cache or disk.
+ DiskRangeListMutateHelper toRead = new
DiskRangeListMutateHelper(listToRead.get());
if (DebugUtils.isTraceOrcEnabled()) {
- LOG.info("Resulting disk ranges to read: "
- + RecordReaderUtils.stringifyDiskRanges(rangesToRead));
+ LOG.info("Resulting disk ranges to read: " +
RecordReaderUtils.stringifyDiskRanges(toRead));
}
if (cache != null) {
- cache.getFileData(fileName, rangesToRead, stripeOffset);
+ cache.getFileData(fileName, toRead.next, stripeOffset);
if (DebugUtils.isTraceOrcEnabled()) {
LOG.info("Disk ranges after cache (base offset " + stripeOffset
- + "): " + RecordReaderUtils.stringifyDiskRanges(rangesToRead));
+ + "): " + RecordReaderUtils.stringifyDiskRanges(toRead));
}
}
+
// Force direct buffers if we will be decompressing to direct cache.
- RecordReaderUtils.readDiskRanges(file, zcr, stripeOffset, rangesToRead,
cache.isDirectAlloc());
+ RecordReaderUtils.readDiskRanges(file, zcr, stripeOffset, toRead.next,
cache.isDirectAlloc());
- // 2.1. Separate buffers (relative to stream offset) for each stream from
the data we have.
- // TODO: given how we read, we could potentially get rid of this step?
- for (ColumnReadContext colCtx : colCtxs) {
- for (int i = 0; i < colCtx.streamCount; ++i) {
- StreamContext sctx = colCtx.streams[i];
- List<DiskRange> sb = RecordReaderUtils.getStreamBuffers(
- rangesToRead, sctx.offset, sctx.length);
- sctx.bufferIter = sb.listIterator();
- if (DebugUtils.isTraceOrcEnabled()) {
- LOG.info("Column " + colCtx.colIx + " stream " + sctx.kind + " at "
+ sctx.offset + ","
- + sctx.length + " got ranges (relative to stream) "
- + RecordReaderUtils.stringifyDiskRanges(sb));
- }
- }
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LOG.info("Disk ranges after disk read (base offset " + stripeOffset
+ + "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next));
}
// 3. Finally, decompress data, map per RG, and return to caller.
// We go by RG and not by column because that is how data is processed.
int rgCount = (int)Math.ceil((double)stripe.getNumberOfRows() /
rowIndexStride);
+ DiskRangeList iter = toRead.next; // Keep "toRead" list for future use,
don't extract().
for (int rgIx = 0; rgIx < rgCount; ++rgIx) {
- boolean isLastRg = rgCount - rgIx - 1 == 0;
+ boolean isLastRg = rgIx == rgCount - 1;
// Create the batch we will use to return data for this RG.
EncodedColumnBatch<OrcBatchKey> ecb = new
EncodedColumnBatch<OrcBatchKey>(
new OrcBatchKey(fileName, stripeIx, rgIx), colRgs.length, 0);
boolean isRGSelected = true;
for (int colIxMod = 0; colIxMod < colRgs.length; ++colIxMod) {
if (colRgs[colIxMod] != null && !colRgs[colIxMod][rgIx]) {
+ // RG x col filtered.
isRGSelected = false;
- continue;
- } // RG x col filtered.
+ continue; // TODO#: this would be invalid with HL cache, where RG x
col can be excluded.
+ }
ColumnReadContext ctx = colCtxs[colIxMod];
RowIndexEntry index = ctx.rowIndex.getEntry(rgIx),
nextIndex = isLastRg ? null : ctx.rowIndex.getEntry(rgIx + 1);
ecb.initColumn(colIxMod, ctx.colIx, ctx.streamCount);
for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) {
StreamContext sctx = ctx.streams[streamIx];
- long absStreamOffset = stripeOffset + sctx.offset;
StreamBuffer cb = null;
if (RecordReaderUtils.isDictionary(sctx.kind, ctx.encoding)) {
// This stream is for entire stripe and needed for every RG;
uncompress once and reuse.
@@ -274,13 +268,26 @@ public class EncodedReaderImpl implement
LOG.info("Getting stripe-level stream [" + sctx.kind + ", " +
ctx.encoding + "] for"
+ " column " + ctx.colIx + " RG " + rgIx + " at " +
sctx.offset + ", " + sctx.length);
}
- cb = getStripeLevelStream(absStreamOffset, sctx, cache, isLastRg);
+ if (sctx.stripeLevelStream == null) {
+ sctx.stripeLevelStream = new StreamBuffer(sctx.kind.getNumber());
+ // We will be using this for each RG while also sending RGs to
processing.
+ // To avoid buffers being unlocked, run refcount one ahead; we
will not increase
+ // it when building the last RG, so each RG processing will
decref once, and the
+ // last one will unlock the buffers.
+ sctx.stripeLevelStream.incRef();
+ iter = InStream.uncompressStream(fileName, stripeOffset, iter,
sctx.offset,
+ sctx.offset + sctx.length, zcr, codec, bufferSize, cache,
sctx.stripeLevelStream);
+ }
+ if (!isLastRg) {
+ sctx.stripeLevelStream.incRef();
+ }
+ cb = sctx.stripeLevelStream;
} else {
// This stream can be separated by RG using index. Let's do that.
- long cOffset = index.getPositions(sctx.streamIndexOffset),
- endCOffset =
RecordReaderUtils.estimateRgEndOffset(isCompressed, isLastRg,
- isLastRg ? sctx.length :
nextIndex.getPositions(sctx.streamIndexOffset),
- sctx.length, bufferSize);
+ long cOffset = index.getPositions(sctx.streamIndexOffset) +
sctx.offset,
+ nextCOffset = isLastRg ? sctx.length :
nextIndex.getPositions(sctx.streamIndexOffset),
+ endCOffset = RecordReaderUtils.estimateRgEndOffset(
+ isCompressed, isLastRg, nextCOffset, sctx.length,
bufferSize) + sctx.offset;
cb = new StreamBuffer(sctx.kind.getNumber());
cb.incRef();
if (DebugUtils.isTraceOrcEnabled()) {
@@ -289,8 +296,11 @@ public class EncodedReaderImpl implement
+ sctx.length + " index position " + sctx.streamIndexOffset
+ ": compressed ["
+ cOffset + ", " + endCOffset + ")");
}
- InStream.uncompressStream(fileName, absStreamOffset, zcr,
sctx.bufferIter,
- codec, bufferSize, cache, cOffset, endCOffset, cb);
+ boolean isStartOfStream = sctx.bufferIter == null;
+ DiskRangeList range = isStartOfStream ? iter : sctx.bufferIter;
+ DiskRangeList next = InStream.uncompressStream(fileName,
stripeOffset, range, cOffset,
+ endCOffset, zcr, codec, bufferSize, cache, cb);
+ sctx.bufferIter = iter = next; // Reset iter just to ensure it's
valid
}
ecb.setStreamData(colIxMod, streamIx, cb);
}
@@ -299,31 +309,14 @@ public class EncodedReaderImpl implement
consumer.consumeData(ecb);
}
}
- // TODO: WE NEED TO DECREF ALL THE CACHE BUFFERS ONCE
- }
- /**
- * Reads the entire stream for a column (e.g. a dictionary stream), or gets
it from context.
- * @param isLastRg Whether the stream is being read for last RG in stripe.
- * @return StreamBuffer that contains the entire stream.
- */
- private StreamBuffer getStripeLevelStream(long baseOffset, StreamContext ctx,
- LowLevelCache cache, boolean isLastRg) throws IOException {
- if (ctx.stripeLevelStream == null) {
- ctx.stripeLevelStream = new StreamBuffer(ctx.kind.getNumber());
- // We will be using this for each RG while also sending RGs to
processing.
- // To avoid buffers being unlocked, run refcount one ahead; we will not
increase
- // it when building the last RG, so each RG processing will decref once,
and the
- // last one will unlock the buffers.
- ctx.stripeLevelStream.incRef();
- InStream.uncompressStream(fileName, baseOffset, zcr,
- ctx.bufferIter, codec, bufferSize, cache, -1, -1,
ctx.stripeLevelStream);
- ctx.bufferIter = null;
- }
- if (!isLastRg) {
- ctx.stripeLevelStream.incRef();
+ DiskRangeList toFree = toRead.next;
+ while (toFree != null) {
+ if (toFree instanceof CacheChunk) {
+ cache.releaseBuffer(((CacheChunk)toFree).buffer);
+ }
+ toFree = toFree.next;
}
- return ctx.stripeLevelStream;
}
@Override
Modified:
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
URL:
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java?rev=1661297&r1=1661296&r2=1661297&view=diff
==============================================================================
---
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
(original)
+++
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/InStream.java
Sat Feb 21 08:01:06 2015
@@ -21,12 +21,14 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.common.DiskRange;
+import org.apache.hadoop.hive.common.DiskRangeList;
import org.apache.hadoop.hive.llap.DebugUtils;
import org.apache.hadoop.hive.llap.LogLevels;
import org.apache.hadoop.hive.llap.io.api.EncodedColumnBatch.StreamBuffer;
@@ -142,7 +144,7 @@ public abstract class InStream extends I
(desired - curRange.offset) < curRange.getLength()) {
currentOffset = desired;
currentRange = i;
- this.range = curRange.getData();
+ this.range = curRange.getData().duplicate();
int pos = range.position();
pos += (int)(desired - curRange.offset); // this is why we duplicate
this.range.position(pos);
@@ -549,13 +551,22 @@ public abstract class InStream extends I
/**
* Uncompresses part of the stream. RGs can overlap, so we cannot just go
and decompress
* and remove what we have returned. We will keep iterator as a "hint" point.
- * TODO: Java LinkedList and iter have a really stupid interface. Replace
with own simple one?
- * @param zcr
+ * @param fileName File name for cache keys.
+ * @param baseOffset Absolute offset of boundaries and ranges relative to
file, for cache keys.
+ * @param start Ordered ranges containing file data. Helpful if they point
close to cOffset.
+ * @param cOffset Start offset to decompress.
+ * @param endCOffset End offset to decompress; estimate, partial CBs will be
ignored.
+ * @param zcr Zero-copy reader, if any, to release discarded buffers.
+ * @param codec Compression codec.
+ * @param bufferSize Compressed buffer (CB) size.
+ * @param cache Low-level cache to cache new data.
+ * @param streamBuffer Stream buffer, to add the results.
+ * @return Last buffer cached during decomrpession. Cache buffers are never
removed from
+ * the master list, so they are safe to keep as iterators for
various streams.
*/
- public static void uncompressStream(String fileName, long baseOffset,
- ZeroCopyReaderShim zcr, ListIterator<DiskRange> ranges,
- CompressionCodec codec, int bufferSize, LowLevelCache cache,
- long cOffset, long endCOffset, StreamBuffer streamBuffer)
+ public static DiskRangeList uncompressStream(String fileName, long
baseOffset,
+ DiskRangeList start, long cOffset, long endCOffset, ZeroCopyReaderShim
zcr,
+ CompressionCodec codec,int bufferSize, LowLevelCache cache, StreamBuffer
streamBuffer)
throws IOException {
streamBuffer.cacheBuffers = new ArrayList<LlapMemoryBuffer>();
List<ProcCacheChunk> toDecompress = null;
@@ -564,20 +575,20 @@ public abstract class InStream extends I
// 1. Find our bearings in the stream. Normally, iter will already point
either to where we
// want to be, or just before. However, RGs can overlap due to encoding,
so we may have
// to return to a previous block.
- DiskRange current = findCompressedPosition(ranges, cOffset);
+ DiskRangeList current = findCompressedPosition(start, cOffset);
if (DebugUtils.isTraceOrcEnabled()) {
LOG.info("Starting uncompressStream for [" + cOffset + "," + endCOffset
+ ") at " + current);
}
// 2. Go thru the blocks; add stuff to results and prepare the
decompression work (see below).
- if (cOffset >= 0 && cOffset != current.offset) {
- // We adjust offsets as we decompress, we expect to decompress
sequentially, and we cache and
- // decompress entire CBs (obviously). Therefore the offset in the next
DiskRange should
- // always be the start offset of a CB. TODO: what about at start?
- throw new AssertionError("Unexpected offset - for " + cOffset + ", got "
+ current.offset);
+ if (cOffset > current.offset) {
+ // Target compression block is in the middle of the range; slice the
range in two.
+ current = current.split(cOffset).next;
}
long currentCOffset = cOffset;
+ DiskRangeList lastCached = null;
while (true) {
+ DiskRangeList next = null;
if (current instanceof CacheChunk) {
// 2a. This is a cached compression buffer, add as is.
CacheChunk cc = (CacheChunk)current;
@@ -587,6 +598,8 @@ public abstract class InStream extends I
if (DebugUtils.isTraceOrcEnabled()) {
LOG.info("Adding an already-uncompressed buffer " + cc.buffer);
}
+ lastCached = current;
+ next = current.next;
} else {
// 2b. This is a compressed buffer. We need to uncompress it; the
buffer can comprise
// several disk ranges, so we might need to combine them.
@@ -596,18 +609,21 @@ public abstract class InStream extends I
toRelease = (zcr == null) ? null : new ArrayList<ByteBuffer>();
}
long originalOffset = bc.offset;
- int compressedBytesConsumed = addOneCompressionBuffer(bc, ranges, zcr,
bufferSize,
+ next = addOneCompressionBuffer(bc, zcr, bufferSize,
cache, streamBuffer.cacheBuffers, toDecompress, toRelease);
- if (compressedBytesConsumed == -1) {
- // endCOffset is an estimate; we have a partially-read compression
block, ignore it
- break;
+ if (next != null) {
+ currentCOffset = next.offset;
+ lastCached = next.prev;
+ // addOne... always adds one CC and returns next range after it
+ assert lastCached instanceof CacheChunk;
+ } else {
+ currentCOffset = originalOffset;
}
- currentCOffset = originalOffset + compressedBytesConsumed;
}
- if ((endCOffset >= 0 && currentCOffset >= endCOffset) ||
!ranges.hasNext()) {
+ if ((endCOffset >= 0 && currentCOffset >= endCOffset) || next == null) {
break;
}
- current = ranges.next();
+ current = next;
}
// 3. Allocate the buffers, prepare cache keys.
@@ -615,7 +631,7 @@ public abstract class InStream extends I
// data and some unallocated membufs for decompression. toDecompress
contains all the work we
// need to do, and each item points to one of the membufs in cacheBuffers
as target. The iter
// has also been adjusted to point to these buffers instead of compressed
data for the ranges.
- if (toDecompress == null) return; // Nothing to decompress.
+ if (toDecompress == null) return lastCached; // Nothing to decompress.
LlapMemoryBuffer[] targetBuffers = new
LlapMemoryBuffer[toDecompress.size()];
DiskRange[] cacheKeys = new DiskRange[toDecompress.size()];
@@ -650,36 +666,23 @@ public abstract class InStream extends I
// 6. Finally, put data to cache.
cache.putFileData(fileName, cacheKeys, targetBuffers, baseOffset);
+ return lastCached;
}
/** Finds compressed offset in a stream and makes sure iter points to its
position.
This may be necessary for obscure combinations of compression and
encoding boundaries. */
- private static DiskRange findCompressedPosition(
- ListIterator<DiskRange> ranges, long cOffset) {
- if (cOffset < 0) return ranges.next();
- DiskRange current = null;
- boolean doCallNext = false;
- if (ranges.hasNext()) {
- current = ranges.next();
- } else if (ranges.hasPrevious()) {
- current = ranges.previous();
- doCallNext = true;
- }
+ private static DiskRangeList findCompressedPosition(
+ DiskRangeList ranges, long cOffset) {
+ if (cOffset < 0) return ranges;
// We expect the offset to be valid TODO: rather, validate
- while (current.end <= cOffset) {
- current = ranges.next();
- doCallNext = false;
- }
- while (current.offset > cOffset) {
- current = ranges.previous();
- doCallNext = true;
- }
- if (doCallNext) {
- // TODO: WTF?
- ranges.next(); // We called previous, make sure next is the real next
and not current.
+ while (ranges.end <= cOffset) {
+ ranges = ranges.next;
}
- return current;
+ while (ranges.offset > cOffset) {
+ ranges = ranges.prev;
+ }
+ return ranges;
}
@@ -695,9 +698,8 @@ public abstract class InStream extends I
* @param toRelease The list of buffers to release to zcr because they are
no longer in use.
* @return The total number of compressed bytes consumed.
*/
- private static int addOneCompressionBuffer(BufferChunk current,
- ListIterator<DiskRange> ranges, ZeroCopyReaderShim zcr, int bufferSize,
- LowLevelCache cache, List<LlapMemoryBuffer> cacheBuffers,
+ private static DiskRangeList addOneCompressionBuffer(BufferChunk current,
ZeroCopyReaderShim zcr,
+ int bufferSize, LowLevelCache cache, List<LlapMemoryBuffer> cacheBuffers,
List<ProcCacheChunk> toDecompress, List<ByteBuffer> toRelease) throws
IOException {
ByteBuffer slice = null;
ByteBuffer compressed = current.chunk;
@@ -721,19 +723,15 @@ public abstract class InStream extends I
// Simple case - CB fits entirely in the disk range.
slice = compressed.slice();
slice.limit(chunkLength);
- addOneCompressionBlockByteBuffer(slice, isUncompressed, cbStartOffset,
cbEndOffset,
- chunkLength, ranges, current, cache, toDecompress, cacheBuffers);
- if (DebugUtils.isTraceOrcEnabled()) {
- LOG.info("Adjusting " + current + " to consume " + consumedLength);
- }
- current.offset += consumedLength;
+ DiskRangeList next = addOneCompressionBlockByteBuffer(slice,
isUncompressed, cbStartOffset,
+ cbEndOffset, chunkLength, consumedLength, current, cache,
toDecompress, cacheBuffers);
if (compressed.remaining() <= 0 && zcr != null) {
toRelease.add(compressed);
}
- return consumedLength;
+ return next;
}
- if (current.end < cbEndOffset && !ranges.hasNext()) {
- return -1; // This is impossible to read from this chunk.
+ if (current.end < cbEndOffset && current.next == null) {
+ return null; // This is impossible to read from this chunk.
}
// TODO: we could remove extra copy for isUncompressed case by copying
directly to cache.
@@ -742,10 +740,11 @@ public abstract class InStream extends I
int remaining = chunkLength - compressed.remaining();
int originalPos = compressed.position();
copy.put(compressed);
- ranges.remove();
if (DebugUtils.isTraceOrcEnabled()) {
LOG.info("Removing " + current + " from ranges");
}
+ DiskRangeList next = current.next;
+ current.removeSelf();
if (zcr != null) {
if (originalPos == 0) {
zcr.releaseBuffer(compressed); // We copied the entire buffer.
@@ -754,40 +753,40 @@ public abstract class InStream extends I
}
}
- DiskRange nextRange = null;
- while (ranges.hasNext()) {
- nextRange = ranges.next();
- if (!(nextRange instanceof BufferChunk)) {
+ while (next != null) {
+ if (!(next instanceof BufferChunk)) {
throw new IOException("Trying to extend compressed block into
uncompressed block");
}
- compressed = nextRange.getData();
+ compressed = next.getData();
if (compressed.remaining() >= remaining) {
// This is the last range for this compression block. Yay!
slice = compressed.slice();
slice.limit(remaining);
copy.put(slice);
- addOneCompressionBlockByteBuffer(copy, isUncompressed, cbStartOffset,
- cbEndOffset, remaining, ranges, current, cache, toDecompress,
cacheBuffers);
+ next = addOneCompressionBlockByteBuffer(copy, isUncompressed,
cbStartOffset, cbEndOffset,
+ remaining, remaining, (BufferChunk)next, cache, toDecompress,
cacheBuffers);
if (DebugUtils.isTraceOrcEnabled()) {
- LOG.info("Adjusting " + nextRange + " to consume " + remaining);
+ LOG.info("Adjusting " + next + " to consume " + remaining);
}
- nextRange.offset += remaining;
if (compressed.remaining() <= 0 && zcr != null) {
zcr.releaseBuffer(compressed); // We copied the entire buffer.
}
- return consumedLength;
+ return next;
}
remaining -= compressed.remaining();
copy.put(compressed);
if (zcr != null) {
zcr.releaseBuffer(compressed); // We copied the entire buffer.
}
+ DiskRangeList tmp = next;
if (DebugUtils.isTraceOrcEnabled()) {
- LOG.info("Removing " + nextRange + " from ranges");
+ LOG.info("Removing " + tmp + " from ranges");
}
- ranges.remove();
+ next = next.next;
+ tmp.removeSelf();
}
- return -1; // This is impossible to read from this chunk.
+ return null; // This is impossible to read from this chunk.
+ // TODO: dbl check this is valid; we just did a bunch of changes to the
list.
}
/**
@@ -797,17 +796,18 @@ public abstract class InStream extends I
* @param cbStartOffset Compressed start offset of the fCB.
* @param cbEndOffset Compressed end offset of the fCB.
* @param lastRange The buffer from which the last (or all) bytes of fCB
come.
- * @param lastPartLength The number of bytes consumed from lastRange into
fCB.
+ * @param lastPartChunkLength The number of compressed bytes consumed from
last *chunk* into fullCompressionBlock.
+ * @param lastPartConsumedLength The number of compressed bytes consumed
from last *range* into fullCompressionBlock.
+ * Can be different from lastPartChunkLength
due to header.
* @param ranges The iterator of all compressed ranges for the stream,
pointing at lastRange.
* @param lastChunk
* @param toDecompress See addOneCompressionBuffer.
* @param cacheBuffers See addOneCompressionBuffer.
*/
- private static void addOneCompressionBlockByteBuffer(ByteBuffer
fullCompressionBlock,
- boolean isUncompressed, long cbStartOffset, long cbEndOffset, int
lastPartLength,
- ListIterator<DiskRange> ranges, BufferChunk lastChunk,
- LowLevelCache cache, List<ProcCacheChunk> toDecompress,
- List<LlapMemoryBuffer> cacheBuffers) {
+ private static DiskRangeList addOneCompressionBlockByteBuffer(ByteBuffer
fullCompressionBlock,
+ boolean isUncompressed, long cbStartOffset, long cbEndOffset, int
lastPartChunkLength,
+ int lastPartConsumedLength, BufferChunk lastChunk, LowLevelCache cache,
+ List<ProcCacheChunk> toDecompress, List<LlapMemoryBuffer> cacheBuffers) {
// Prepare future cache buffer.
LlapMemoryBuffer futureAlloc = cache.createUnallocated();
// Add it to result in order we are processing.
@@ -817,21 +817,27 @@ public abstract class InStream extends I
cbStartOffset, cbEndOffset, !isUncompressed, fullCompressionBlock,
futureAlloc);
toDecompress.add(cc);
// Adjust the compression block position.
- lastChunk.chunk.position(lastChunk.chunk.position() + lastPartLength);
+ if (DebugUtils.isTraceOrcEnabled()) {
+ LOG.info("Adjusting " + lastChunk + " to consume " + lastPartChunkLength
+ + " compressed / " + lastPartConsumedLength + " total bytes");
+ }
+ lastChunk.chunk.position(lastChunk.chunk.position() + lastPartChunkLength);
+ lastChunk.offset += lastPartConsumedLength;
// Finally, put it in the ranges list for future use (if shared between
RGs).
// Before anyone else accesses it, it would have been allocated and
decompressed locally.
if (lastChunk.chunk.remaining() <= 0) {
if (DebugUtils.isTraceOrcEnabled()) {
LOG.info("Replacing " + lastChunk + " with " + cc + " in the buffers");
}
- ranges.set(cc);
+ assert lastChunk.offset == lastChunk.end;
+ lastChunk.replaceSelfWith(cc);
+ return cc.next;
} else {
- DiskRange before = ranges.previous();
if (DebugUtils.isTraceOrcEnabled()) {
- LOG.info("Adding " + cc + " before " + before + " in the buffers");
+ LOG.info("Adding " + cc + " before " + lastChunk + " in the buffers");
}
- ranges.add(cc);
- // At this point, next() should return before, which is the 2nd part of
the split buffer.
+ lastChunk.insertBefore(cc);
+ return lastChunk;
}
}
}
Modified:
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
URL:
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java?rev=1661297&r1=1661296&r2=1661297&view=diff
==============================================================================
---
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
(original)
+++
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
Sat Feb 21 08:01:06 2015
@@ -43,6 +43,8 @@ import org.apache.hadoop.fs.FSDataInputS
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.DiskRange;
+import org.apache.hadoop.hive.common.DiskRangeList;
+import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListCreateHelper;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.llap.io.api.cache.LlapMemoryBuffer;
@@ -103,7 +105,7 @@ public class RecordReaderImpl implements
private long rowCountInStripe = 0;
private final Map<StreamName, InStream> streams =
new HashMap<StreamName, InStream>();
- List<DiskRange> bufferChunks = new ArrayList<DiskRange>(0);
+ DiskRangeList bufferChunks = null;
private final TreeReader reader;
private final OrcProto.RowIndex[] indexes;
private final OrcProto.BloomFilterIndex[] bloomFilterIndices;
@@ -2956,15 +2958,17 @@ public class RecordReaderImpl implements
for(InStream is: streams.values()) {
is.close();
}
- if(bufferChunks != null) {
+ if (bufferChunks != null) {
if (zcr != null) {
- for (DiskRange range : bufferChunks) {
+ DiskRangeList range = bufferChunks;
+ while (range != null) {
if (range instanceof BufferChunk) {
zcr.releaseBuffer(((BufferChunk)range).chunk);
}
+ range = range.next;
}
}
- bufferChunks.clear();
+ bufferChunks = null;
}
streams.clear();
}
@@ -3019,18 +3023,15 @@ public class RecordReaderImpl implements
return stripe;
}
- private void readAllDataStreams(StripeInformation stripe
- ) throws IOException {
+ private void readAllDataStreams(StripeInformation stripe) throws IOException
{
long start = stripe.getIndexLength();
long end = start + stripe.getDataLength();
// explicitly trigger 1 big read
- LinkedList<DiskRange> rangesToRead = Lists.newLinkedList();
- rangesToRead.add(new DiskRange(start, end));
+ DiskRangeList toRead = new DiskRangeList(start, end);
if (this.cache != null) {
- cache.getFileData(fileName, rangesToRead, stripe.getOffset());
+ toRead = cache.getFileData(fileName, toRead, stripe.getOffset());
}
- RecordReaderUtils.readDiskRanges(file, zcr, stripe.getOffset(),
rangesToRead, false);
- bufferChunks = rangesToRead;
+ bufferChunks = RecordReaderUtils.readDiskRanges(file, zcr,
stripe.getOffset(), toRead, false);
List<OrcProto.Stream> streamDescriptions = stripeFooter.getStreamsList();
createStreams(
streamDescriptions, bufferChunks, null, codec, bufferSize, streams,
cache);
@@ -3041,7 +3042,7 @@ public class RecordReaderImpl implements
* The sections of stripe that we have read.
* This might not match diskRange - 1 disk range can be multiple buffer
chunks, depending on DFS block boundaries.
*/
- public static class BufferChunk extends DiskRange {
+ public static class BufferChunk extends DiskRangeList {
final ByteBuffer chunk;
BufferChunk(ByteBuffer chunk, long offset) {
@@ -3065,8 +3066,8 @@ public class RecordReaderImpl implements
public DiskRange slice(long offset, long end) {
assert offset <= end && offset >= this.offset && end <= this.end;
ByteBuffer sliceBuf = chunk.slice();
- int newPos = chunk.position() + (int)(offset - this.offset);
- int newLimit = chunk.limit() - chunk.position() - (int)(this.end - end);
+ int newPos = (int)(offset - this.offset);
+ int newLimit = newPos + (int)(end - offset);
sliceBuf.position(newPos);
sliceBuf.limit(newLimit);
return new BufferChunk(sliceBuf, offset);
@@ -3078,7 +3079,7 @@ public class RecordReaderImpl implements
}
}
- public static class CacheChunk extends DiskRange {
+ public static class CacheChunk extends DiskRangeList {
public LlapMemoryBuffer buffer;
public CacheChunk(LlapMemoryBuffer buffer, long offset, long end) {
@@ -3115,7 +3116,7 @@ public class RecordReaderImpl implements
* @param compressionSize the compression block size
* @return the list of disk ranges that will be loaded
*/
- static LinkedList<DiskRange> planReadPartialDataStreams
+ static DiskRangeList planReadPartialDataStreams
(List<OrcProto.Stream> streamList,
OrcProto.RowIndex[] indexes,
boolean[] includedColumns,
@@ -3123,12 +3124,12 @@ public class RecordReaderImpl implements
boolean isCompressed,
List<OrcProto.ColumnEncoding> encodings,
List<OrcProto.Type> types,
- int compressionSize) {
- LinkedList<DiskRange> result = new LinkedList<DiskRange>();
+ int compressionSize,
+ boolean doMergeBuffers) {
long offset = 0;
// figure out which columns have a present stream
boolean[] hasNull =
RecordReaderUtils.findPresentStreamsByColumn(streamList, types);
- DiskRange lastRange = null;
+ DiskRangeListCreateHelper list = new DiskRangeListCreateHelper();
for (OrcProto.Stream stream : streamList) {
long length = stream.getLength();
int column = stream.getColumn();
@@ -3140,16 +3141,16 @@ public class RecordReaderImpl implements
// if we aren't filtering or it is a dictionary, load it.
if (includedRowGroups == null
|| RecordReaderUtils.isDictionary(streamKind,
encodings.get(column))) {
- lastRange = RecordReaderUtils.addEntireStreamToRanges(offset,
length, lastRange, result);
+ RecordReaderUtils.addEntireStreamToRanges(offset, length, list,
doMergeBuffers);
} else {
- lastRange = RecordReaderUtils.addRgFilteredStreamToRanges(stream,
includedRowGroups,
+ RecordReaderUtils.addRgFilteredStreamToRanges(stream,
includedRowGroups,
isCompressed, indexes[column], encodings.get(column),
types.get(column),
- compressionSize, hasNull[column], offset, length, lastRange,
result);
+ compressionSize, hasNull[column], offset, length, list,
doMergeBuffers);
}
}
offset += length;
}
- return result;
+ return list.extract();
}
/**
@@ -3157,24 +3158,21 @@ public class RecordReaderImpl implements
* assumes that the ranges are sorted.
* @param ranges the list of disk ranges to merge
*/
- static void mergeDiskRanges(List<DiskRange> ranges) {
- DiskRange prev = null;
- for(int i=0; i < ranges.size(); ++i) {
- DiskRange current = ranges.get(i);
- if (prev != null && RecordReaderUtils.overlap(prev.offset, prev.end,
- current.offset, current.end)) {
- prev.offset = Math.min(prev.offset, current.offset);
- prev.end = Math.max(prev.end, current.end);
- ranges.remove(i);
- i -= 1;
+ static void mergeDiskRanges(DiskRangeList range) {
+ while (range != null && range.next != null) {
+ DiskRangeList next = range.next;
+ if (RecordReaderUtils.overlap(range.offset, range.end, next.offset,
next.end)) {
+ range.offset = Math.min(range.offset, next.offset);
+ range.end = Math.max(range.end, next.end);
+ range.removeAfter();
} else {
- prev = current;
+ range = next;
}
}
}
void createStreams(List<OrcProto.Stream> streamDescriptions,
- List<DiskRange> ranges,
+ DiskRangeList ranges,
boolean[] includeColumn,
CompressionCodec codec,
int bufferSize,
@@ -3205,21 +3203,19 @@ public class RecordReaderImpl implements
private void readPartialDataStreams(StripeInformation stripe) throws
IOException {
List<OrcProto.Stream> streamList = stripeFooter.getStreamsList();
- LinkedList<DiskRange> rangesToRead =
- planReadPartialDataStreams(streamList,
+ DiskRangeList toRead = planReadPartialDataStreams(streamList,
indexes, included, includedRowGroups, codec != null,
- stripeFooter.getColumnsList(), types, bufferSize);
+ stripeFooter.getColumnsList(), types, bufferSize, true);
if (LOG.isDebugEnabled()) {
- LOG.debug("chunks = " +
RecordReaderUtils.stringifyDiskRanges(rangesToRead));
+ LOG.debug("chunks = " + RecordReaderUtils.stringifyDiskRanges(toRead));
}
- mergeDiskRanges(rangesToRead);
+ mergeDiskRanges(toRead);
if (this.cache != null) {
- cache.getFileData(fileName, rangesToRead, stripe.getOffset());
+ toRead = cache.getFileData(fileName, toRead, stripe.getOffset());
}
- RecordReaderUtils.readDiskRanges(file, zcr, stripe.getOffset(),
rangesToRead, false);
- bufferChunks = rangesToRead;
+ bufferChunks = RecordReaderUtils.readDiskRanges(file, zcr,
stripe.getOffset(), toRead, false);
if (LOG.isDebugEnabled()) {
- LOG.debug("merge = " +
RecordReaderUtils.stringifyDiskRanges(rangesToRead));
+ LOG.debug("merge = " +
RecordReaderUtils.stringifyDiskRanges(bufferChunks));
}
createStreams(streamList, bufferChunks, included, codec, bufferSize,
streams, cache);
Modified:
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java
URL:
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java?rev=1661297&r1=1661296&r2=1661297&view=diff
==============================================================================
---
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java
(original)
+++
hive/branches/llap/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderUtils.java
Sat Feb 21 08:01:06 2015
@@ -21,16 +21,16 @@ import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.LinkedList;
import java.util.List;
-import java.util.ListIterator;
import java.util.Map;
import java.util.TreeMap;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hive.common.DiskRange;
-import org.apache.hadoop.hive.llap.DebugUtils;
+import org.apache.hadoop.hive.common.DiskRangeList;
+import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListCreateHelper;
+import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListMutateHelper;
import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.BufferChunk;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.hive.shims.HadoopShims.ByteBufferPoolShim;
@@ -68,24 +68,15 @@ public class RecordReaderUtils {
return rightB >= leftA;
}
-
- static DiskRange addEntireStreamToRanges(long offset, long length,
- DiskRange lastRange, LinkedList<DiskRange> result) {
- long end = offset + length;
- if (lastRange != null && overlap(lastRange.offset, lastRange.end, offset,
end)) {
- lastRange.offset = Math.min(lastRange.offset, offset);
- lastRange.end = Math.max(lastRange.end, end);
- } else {
- lastRange = new DiskRange(offset, end);
- result.add(lastRange);
- }
- return lastRange;
+ static void addEntireStreamToRanges(
+ long offset, long length, DiskRangeListCreateHelper list, boolean
doMergeBuffers) {
+ list.addOrMerge(offset, offset + length, doMergeBuffers, false);
}
- static DiskRange addRgFilteredStreamToRanges(OrcProto.Stream stream,
+ static void addRgFilteredStreamToRanges(OrcProto.Stream stream,
boolean[] includedRowGroups, boolean isCompressed, OrcProto.RowIndex
index,
OrcProto.ColumnEncoding encoding, OrcProto.Type type, int
compressionSize, boolean hasNull,
- long offset, long length, DiskRange lastRange, LinkedList<DiskRange>
result) {
+ long offset, long length, DiskRangeListCreateHelper list, boolean
doMergeBuffers) {
for (int group = 0; group < includedRowGroups.length; ++group) {
if (!includedRowGroups[group]) continue;
int posn = getIndexPosition(
@@ -98,19 +89,8 @@ public class RecordReaderUtils {
start += offset;
long end = offset + estimateRgEndOffset(
isCompressed, isLast, nextGroupOffset, length, compressionSize);
- if (lastRange != null && overlap(lastRange.offset, lastRange.end, start,
end)) {
- lastRange.offset = Math.min(lastRange.offset, start);
- lastRange.end = Math.max(lastRange.end, end);
- } else {
- if (DebugUtils.isTraceOrcEnabled()) {
- RecordReaderImpl.LOG.info("Creating new range for RG read; last
range (which can "
- + "include some previous RGs) was " + lastRange);
- }
- lastRange = new DiskRange(start, end);
- result.add(lastRange);
- }
+ list.addOrMerge(start, end, doMergeBuffers, true);
}
- return lastRange;
}
static long estimateRgEndOffset(boolean isCompressed, boolean isLast,
@@ -218,14 +198,17 @@ public class RecordReaderUtils {
* @param ranges ranges to stringify
* @return the resulting string
*/
- static String stringifyDiskRanges(List<DiskRange> ranges) {
+ static String stringifyDiskRanges(DiskRangeList range) {
StringBuilder buffer = new StringBuilder();
buffer.append("[");
- for(int i=0; i < ranges.size(); ++i) {
- if (i != 0) {
+ boolean isFirst = true;
+ while (range != null) {
+ if (!isFirst) {
buffer.append(", ");
}
- buffer.append(ranges.get(i).toString());
+ isFirst = false;
+ buffer.append(range.toString());
+ range = range.next;
}
buffer.append("]");
return buffer.toString();
@@ -240,15 +223,21 @@ public class RecordReaderUtils {
* ranges
* @throws IOException
*/
- static void readDiskRanges(FSDataInputStream file,
+ static DiskRangeList readDiskRanges(FSDataInputStream file,
ZeroCopyReaderShim zcr,
long base,
- LinkedList<DiskRange> ranges,
+ DiskRangeList range,
boolean doForceDirect) throws IOException {
- ListIterator<DiskRange> rangeIter = ranges.listIterator();
- while (rangeIter.hasNext()) {
- DiskRange range = rangeIter.next();
- if (range.hasData()) continue;
+ if (range == null) return null;
+ DiskRangeList prev = range.prev;
+ if (prev == null) {
+ prev = new DiskRangeListMutateHelper(range);
+ }
+ while (range != null) {
+ if (range.hasData()) {
+ range = range.next;
+ continue;
+ }
int len = (int) (range.end - range.offset);
long off = range.offset;
file.seek(base + off);
@@ -258,11 +247,12 @@ public class RecordReaderUtils {
ByteBuffer partial = zcr.readBuffer(len, false);
BufferChunk bc = new BufferChunk(partial, off);
if (!hasReplaced) {
- rangeIter.set(bc);
+ range.replaceSelfWith(bc);
hasReplaced = true;
} else {
- rangeIter.add(bc);
+ range.insertAfter(bc);
}
+ range = bc;
int read = partial.remaining();
len -= read;
off += read;
@@ -282,24 +272,29 @@ public class RecordReaderUtils {
directBuf.put(buffer);
}
directBuf.position(0);
- rangeIter.set(new BufferChunk(directBuf, range.offset));
+ range = range.replaceSelfWith(new BufferChunk(directBuf,
range.offset));
} else {
byte[] buffer = new byte[len];
file.readFully(buffer, 0, buffer.length);
- rangeIter.set(new BufferChunk(ByteBuffer.wrap(buffer), range.offset));
+ range = range.replaceSelfWith(new BufferChunk(ByteBuffer.wrap(buffer),
range.offset));
}
+ range = range.next;
}
+ return prev.next;
}
- static List<DiskRange> getStreamBuffers(List<DiskRange> ranges, long offset,
long length) {
+ static List<DiskRange> getStreamBuffers(DiskRangeList range, long offset,
long length) {
// This assumes sorted ranges (as do many other parts of ORC code.
ArrayList<DiskRange> buffers = new ArrayList<DiskRange>();
long streamEnd = offset + length;
boolean inRange = false;
- for (DiskRange range : ranges) {
+ while (range != null) {
if (!inRange) {
- if (range.end <= offset) continue; // Skip until we are in range.
+ if (range.end <= offset) {
+ range = range.next;
+ continue; // Skip until we are in range.
+ }
inRange = true;
if (range.offset < offset) {
// Partial first buffer, add a slice of it.
@@ -307,6 +302,7 @@ public class RecordReaderUtils {
partial.shiftBy(-offset);
buffers.add(partial);
if (range.end >= streamEnd) break; // Partial first buffer is also
partial last buffer.
+ range = range.next;
continue;
}
} else if (range.offset >= streamEnd) {
@@ -326,6 +322,7 @@ public class RecordReaderUtils {
full.shiftBy(-offset);
buffers.add(full);
if (range.end == streamEnd) break;
+ range = range.next;
}
return buffers;
}
Modified:
hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java
URL:
http://svn.apache.org/viewvc/hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java?rev=1661297&r1=1661296&r2=1661297&view=diff
==============================================================================
---
hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java
(original)
+++
hive/branches/llap/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestRecordReaderImpl.java
Sat Feb 21 08:01:06 2015
@@ -20,8 +20,7 @@ package org.apache.hadoop.hive.ql.io.orc
import static junit.framework.Assert.assertEquals;
import static org.hamcrest.core.Is.is;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
import java.io.IOException;
import java.io.InputStream;
@@ -36,7 +35,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.Seekable;
-import org.apache.hadoop.hive.common.DiskRange;
+import org.apache.hadoop.hive.common.DiskRangeList;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.io.filters.BloomFilter;
import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.Location;
@@ -788,21 +787,22 @@ public class TestRecordReaderImpl {
assertTrue(!RecordReaderUtils.overlap(0, 10, 11, 12));
}
- private static List<DiskRange> diskRanges(Integer... points) {
- List<DiskRange> result =
- new ArrayList<DiskRange>();
- for(int i=0; i < points.length; i += 2) {
- result.add(new DiskRange(points[i], points[i+1]));
+ private static DiskRangeList diskRanges(Integer... points) {
+ DiskRangeList head = null, tail = null;
+ for(int i = 0; i < points.length; i += 2) {
+ DiskRangeList range = new DiskRangeList(points[i], points[i+1]);
+ if (tail == null) {
+ head = tail = range;
+ } else {
+ tail = tail.insertAfter(range);
+ }
}
- return result;
+ return head;
}
@Test
public void testMergeDiskRanges() throws Exception {
- List<DiskRange> list = diskRanges();
- RecordReaderImpl.mergeDiskRanges(list);
- assertThat(list, is(diskRanges()));
- list = diskRanges(100, 200, 300, 400, 500, 600);
+ DiskRangeList list = diskRanges(100, 200, 300, 400, 500, 600);
RecordReaderImpl.mergeDiskRanges(list);
assertThat(list, is(diskRanges(100, 200, 300, 400, 500, 600)));
list = diskRanges(100, 200, 150, 300, 400, 500);
@@ -879,7 +879,7 @@ public class TestRecordReaderImpl {
@Test
public void testPartialPlan() throws Exception {
- List<DiskRange> result;
+ DiskRangeList result;
// set the streams
List<OrcProto.Stream> streams = new ArrayList<OrcProto.Stream>();
@@ -948,30 +948,41 @@ public class TestRecordReaderImpl {
// filter by rows and groups
result = RecordReaderImpl.planReadPartialDataStreams(streams, indexes,
- columns, rowGroups, false, encodings, types, 32768);
+ columns, rowGroups, false, encodings, types, 32768, false);
assertThat(result, is(diskRanges(0, 1000, 100, 1000, 400, 1000,
1000, 11000 + RecordReaderUtils.WORST_UNCOMPRESSED_SLOP,
11000, 21000 + RecordReaderUtils.WORST_UNCOMPRESSED_SLOP,
41000, 51000 + RecordReaderUtils.WORST_UNCOMPRESSED_SLOP)));
+ result = RecordReaderImpl.planReadPartialDataStreams(streams, indexes,
+ columns, rowGroups, false, encodings, types, 32768, true);
+ assertThat(result, is(diskRanges(0, 21000 +
RecordReaderUtils.WORST_UNCOMPRESSED_SLOP,
+ 41000, 51000 + RecordReaderUtils.WORST_UNCOMPRESSED_SLOP)));
// if we read no rows, don't read any bytes
rowGroups = new boolean[]{false, false, false, false, false, false};
result = RecordReaderImpl.planReadPartialDataStreams(streams, indexes,
- columns, rowGroups, false, encodings, types, 32768);
- assertThat(result, is(diskRanges()));
+ columns, rowGroups, false, encodings, types, 32768, false);
+ assertNull(result);
// all rows, but only columns 0 and 2.
rowGroups = null;
columns = new boolean[]{true, false, true};
result = RecordReaderImpl.planReadPartialDataStreams(streams, indexes,
- columns, null, false, encodings, types, 32768);
+ columns, null, false, encodings, types, 32768, false);
assertThat(result, is(diskRanges(100000, 102000, 102000, 200000)));
+ result = RecordReaderImpl.planReadPartialDataStreams(streams, indexes,
+ columns, null, false, encodings, types, 32768, true);
+ assertThat(result, is(diskRanges(100000, 200000)));
rowGroups = new boolean[]{false, true, false, false, false, false};
indexes[2] = indexes[1];
indexes[1] = null;
result = RecordReaderImpl.planReadPartialDataStreams(streams, indexes,
- columns, rowGroups, false, encodings, types, 32768);
+ columns, rowGroups, false, encodings, types, 32768, false);
+ assertThat(result, is(diskRanges(100100, 102000,
+ 112000, 122000 + RecordReaderUtils.WORST_UNCOMPRESSED_SLOP)));
+ result = RecordReaderImpl.planReadPartialDataStreams(streams, indexes,
+ columns, rowGroups, false, encodings, types, 32768, true);
assertThat(result, is(diskRanges(100100, 102000,
112000, 122000 + RecordReaderUtils.WORST_UNCOMPRESSED_SLOP)));
@@ -979,7 +990,11 @@ public class TestRecordReaderImpl {
indexes[1] = indexes[2];
columns = new boolean[]{true, true, true};
result = RecordReaderImpl.planReadPartialDataStreams(streams, indexes,
- columns, rowGroups, false, encodings, types, 32768);
+ columns, rowGroups, false, encodings, types, 32768, false);
+ assertThat(result, is(diskRanges(500, 1000, 51000, 100000, 100500, 102000,
+ 152000, 200000)));
+ result = RecordReaderImpl.planReadPartialDataStreams(streams, indexes,
+ columns, rowGroups, false, encodings, types, 32768, true);
assertThat(result, is(diskRanges(500, 1000, 51000, 100000, 100500, 102000,
152000, 200000)));
}
@@ -987,7 +1002,7 @@ public class TestRecordReaderImpl {
@Test
public void testPartialPlanCompressed() throws Exception {
- List<DiskRange> result;
+ DiskRangeList result;
// set the streams
List<OrcProto.Stream> streams = new ArrayList<OrcProto.Stream>();
@@ -1056,20 +1071,20 @@ public class TestRecordReaderImpl {
// filter by rows and groups
result = RecordReaderImpl.planReadPartialDataStreams(streams, indexes,
- columns, rowGroups, true, encodings, types, 32768);
+ columns, rowGroups, true, encodings, types, 32768, false);
assertThat(result, is(diskRanges(0, 1000, 100, 1000,
400, 1000, 1000, 11000+(2*32771),
11000, 21000+(2*32771), 41000, 100000)));
rowGroups = new boolean[]{false, false, false, false, false, true};
result = RecordReaderImpl.planReadPartialDataStreams(streams, indexes,
- columns, rowGroups, true, encodings, types, 32768);
+ columns, rowGroups, true, encodings, types, 32768, false);
assertThat(result, is(diskRanges(500, 1000, 51000, 100000)));
}
@Test
public void testPartialPlanString() throws Exception {
- List<DiskRange> result;
+ DiskRangeList result;
// set the streams
List<OrcProto.Stream> streams = new ArrayList<OrcProto.Stream>();
@@ -1144,7 +1159,7 @@ public class TestRecordReaderImpl {
// filter by rows and groups
result = RecordReaderImpl.planReadPartialDataStreams(streams, indexes,
- columns, rowGroups, false, encodings, types, 32768);
+ columns, rowGroups, false, encodings, types, 32768, false);
assertThat(result, is(diskRanges(100, 1000, 400, 1000, 500, 1000,
11000, 21000 + RecordReaderUtils.WORST_UNCOMPRESSED_SLOP,
41000, 51000 + RecordReaderUtils.WORST_UNCOMPRESSED_SLOP,