This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/main by this push:
new 4aa267f54 ORC-1138: Optimizes the read of streams in ORC by combining
multiple nearby reads into a single read, optionally allowing the retention or
drop of the extra bytes
4aa267f54 is described below
commit 4aa267f54980738049454d8c1ea85a5aea7c968c
Author: Pavan Lanka <[email protected]>
AuthorDate: Mon Mar 28 09:23:17 2022 -0700
ORC-1138: Optimizes the read of streams in ORC by combining multiple nearby
reads into a single read, optionally allowing the retention or drop of the
extra bytes
We are introducing two new configuration parameters that control how read
of streams takes place in ORC
* minSeekSize: If separation between multiple reads is within minSeekSize
then these are combined into a single read
* minSeekSizeTolerance: Helps in the decisioning of whether to retain the
extra bytes (extra memory) or take extra CPU to drop the unwanted bytes
This leads to significant time savings (and cost also) when dealing with
AWS S3. Reads with gaps e.g. reading alternate columns shows a significant
penalty 5.8s vs 1.4s with the patch.
* New Unit Tests were added
* None of the existing tests were changed
Closes #1072
Signed-off-by: Dongjoon Hyun <[email protected]>
---
java/core/src/java/org/apache/orc/OrcConf.java | 13 ++
java/core/src/java/org/apache/orc/Reader.java | 35 ++++
.../org/apache/orc/impl/DataReaderProperties.java | 25 +++
.../java/org/apache/orc/impl/RecordReaderImpl.java | 4 +-
.../org/apache/orc/impl/RecordReaderUtils.java | 216 ++++++++++++++++++++-
.../src/test/org/apache/orc/TestMinSeekSize.java | 210 ++++++++++++++++++++
.../org/apache/orc/impl/TestRecordReaderUtils.java | 178 +++++++++++++++++
7 files changed, 675 insertions(+), 6 deletions(-)
diff --git a/java/core/src/java/org/apache/orc/OrcConf.java
b/java/core/src/java/org/apache/orc/OrcConf.java
index 6b6441527..25bd8b973 100644
--- a/java/core/src/java/org/apache/orc/OrcConf.java
+++ b/java/core/src/java/org/apache/orc/OrcConf.java
@@ -194,6 +194,19 @@ public enum OrcConf {
ORC_MAX_DISK_RANGE_CHUNK_LIMIT("orc.max.disk.range.chunk.limit",
"hive.exec.orc.max.disk.range.chunk.limit",
Integer.MAX_VALUE - 1024, "When reading stripes >2GB, specify max limit
for the chunk size."),
+ ORC_MIN_DISK_SEEK_SIZE("orc.min.disk.seek.size",
+ "orc.min.disk.seek.size",
+ 0,
+ "When determining contiguous reads, gaps within this
size are "
+ + "read contiguously and not seeked. Default value of
zero disables this "
+ + "optimization"),
+ ORC_MIN_DISK_SEEK_SIZE_TOLERANCE("orc.min.disk.seek.size.tolerance",
+ "orc.min.disk.seek.size.tolerance", 0.00,
+ "Define the tolerance for for extra bytes read as a
result of "
+ + "orc.min.disk.seek.size. If the "
+ + "(bytesRead - bytesNeeded) / bytesNeeded is
greater than this "
+ + "threshold then extra work is performed to drop
the extra bytes from "
+ + "memory after the read."),
ENCRYPTION("orc.encrypt", "orc.encrypt", null, "The list of keys and columns
to encrypt with"),
DATA_MASK("orc.mask", "orc.mask", null, "The masks to apply to the encrypted
columns"),
KEY_PROVIDER("orc.key.provider", "orc.key.provider", "hadoop",
diff --git a/java/core/src/java/org/apache/orc/Reader.java
b/java/core/src/java/org/apache/orc/Reader.java
index 0b88599f2..56b14393a 100644
--- a/java/core/src/java/org/apache/orc/Reader.java
+++ b/java/core/src/java/org/apache/orc/Reader.java
@@ -234,6 +234,9 @@ public interface Reader extends Closeable {
private boolean allowSARGToFilter = false;
private boolean useSelected = false;
private boolean allowPluginFilters = false;
+ private int minSeekSize = (int)
OrcConf.ORC_MIN_DISK_SEEK_SIZE.getDefaultValue();
+ private double minSeekSizeTolerance = (double)
OrcConf.ORC_MIN_DISK_SEEK_SIZE_TOLERANCE
+ .getDefaultValue();
/**
* @since 1.1.0
@@ -256,6 +259,8 @@ public interface Reader extends Closeable {
allowSARGToFilter = OrcConf.ALLOW_SARG_TO_FILTER.getBoolean(conf);
useSelected = OrcConf.READER_USE_SELECTED.getBoolean(conf);
allowPluginFilters = OrcConf.ALLOW_PLUGIN_FILTER.getBoolean(conf);
+ minSeekSize = OrcConf.ORC_MIN_DISK_SEEK_SIZE.getInt(conf);
+ minSeekSizeTolerance =
OrcConf.ORC_MIN_DISK_SEEK_SIZE_TOLERANCE.getDouble(conf);
}
/**
@@ -648,6 +653,36 @@ public interface Reader extends Closeable {
this.allowPluginFilters = allowPluginFilters;
return this;
}
+
+ /**
+ * @since 1.8.0
+ */
+ public int minSeekSize() {
+ return minSeekSize;
+ }
+
+ /**
+ * @since 1.8.0
+ */
+ public Options minSeekSize(int minSeekSize) {
+ this.minSeekSize = minSeekSize;
+ return this;
+ }
+
+ /**
+ * @since 1.8.0
+ */
+ public double minSeekSizeTolerance() {
+ return minSeekSizeTolerance;
+ }
+
+ /**
+ * @since 1.8.0
+ */
+ public Options minSeekSizeTolerance(double value) {
+ this.minSeekSizeTolerance = value;
+ return this;
+ }
}
/**
diff --git a/java/core/src/java/org/apache/orc/impl/DataReaderProperties.java
b/java/core/src/java/org/apache/orc/impl/DataReaderProperties.java
index 3bc5876ce..19a537c6b 100644
--- a/java/core/src/java/org/apache/orc/impl/DataReaderProperties.java
+++ b/java/core/src/java/org/apache/orc/impl/DataReaderProperties.java
@@ -32,6 +32,8 @@ public final class DataReaderProperties {
private final InStream.StreamOptions compression;
private final boolean zeroCopy;
private final int maxDiskRangeChunkLimit;
+ private final int minSeekSize;
+ private final double minSeekSizeTolerance;
private DataReaderProperties(Builder builder) {
this.fileSystemSupplier = builder.fileSystemSupplier;
@@ -40,6 +42,8 @@ public final class DataReaderProperties {
this.compression = builder.compression;
this.zeroCopy = builder.zeroCopy;
this.maxDiskRangeChunkLimit = builder.maxDiskRangeChunkLimit;
+ this.minSeekSize = builder.minSeekSize;
+ this.minSeekSizeTolerance = builder.minSeekSizeTolerance;
}
public Supplier<FileSystem> getFileSystemSupplier() {
@@ -70,6 +74,14 @@ public final class DataReaderProperties {
return new Builder();
}
+ public int getMinSeekSize() {
+ return minSeekSize;
+ }
+
+ public double getMinSeekSizeTolerance() {
+ return minSeekSizeTolerance;
+ }
+
public static class Builder {
private Supplier<FileSystem> fileSystemSupplier;
@@ -79,6 +91,9 @@ public final class DataReaderProperties {
private boolean zeroCopy;
private int maxDiskRangeChunkLimit =
(int) OrcConf.ORC_MAX_DISK_RANGE_CHUNK_LIMIT.getDefaultValue();
+ private int minSeekSize = (int)
OrcConf.ORC_MIN_DISK_SEEK_SIZE.getDefaultValue();
+ private double minSeekSizeTolerance = (double)
OrcConf.ORC_MIN_DISK_SEEK_SIZE_TOLERANCE
+ .getDefaultValue();
private Builder() {
@@ -119,6 +134,16 @@ public final class DataReaderProperties {
return this;
}
+ public Builder withMinSeekSize(int value) {
+ minSeekSize = value;
+ return this;
+ }
+
+ public Builder withMinSeekSizeTolerance(double value) {
+ minSeekSizeTolerance = value;
+ return this;
+ }
+
public DataReaderProperties build() {
if (fileSystemSupplier == null || path == null) {
throw new NullPointerException("Filesystem = " + fileSystemSupplier +
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
index ade91c6e3..e1d37360a 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderImpl.java
@@ -281,7 +281,9 @@ public class RecordReaderImpl implements RecordReader {
.withFileSystemSupplier(fileReader.getFileSystemSupplier())
.withPath(fileReader.path)
.withMaxDiskRangeChunkLimit(maxDiskRangeChunkLimit)
- .withZeroCopy(zeroCopy);
+ .withZeroCopy(zeroCopy)
+ .withMinSeekSize(options.minSeekSize())
+ .withMinSeekSizeTolerance(options.minSeekSizeTolerance());
FSDataInputStream file = fileReader.takeFile();
if (file != null) {
builder.withFile(file);
diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
index a39e1ed13..19902d299 100644
--- a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
+++ b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java
@@ -53,6 +53,8 @@ public class RecordReaderUtils {
private final Supplier<FileSystem> fileSystemSupplier;
private final Path path;
private final boolean useZeroCopy;
+ private final int minSeekSize;
+ private final double minSeekSizeTolerance;
private InStream.StreamOptions options;
private boolean isOpen = false;
@@ -62,6 +64,8 @@ public class RecordReaderUtils {
this.file = properties.getFile();
this.useZeroCopy = properties.getZeroCopy();
this.options = properties.getCompression();
+ this.minSeekSize = properties.getMinSeekSize();
+ this.minSeekSizeTolerance = properties.getMinSeekSizeTolerance();
}
@Override
@@ -99,7 +103,8 @@ public class RecordReaderUtils {
public BufferChunkList readFileData(BufferChunkList range,
boolean doForceDirect
) throws IOException {
- RecordReaderUtils.readDiskRanges(file, zcr, range, doForceDirect);
+ RecordReaderUtils.readDiskRanges(file, zcr, range, doForceDirect,
minSeekSize,
+ minSeekSizeTolerance);
return range;
}
@@ -459,11 +464,23 @@ public class RecordReaderUtils {
* @return the last range to read
*/
static BufferChunk findSingleRead(BufferChunk first) {
+ return findSingleRead(first, 0);
+ }
+
+ /**
+ * Find the list of ranges that should be read in a single read.
+ * The read will stop when there is a gap, one of the ranges already has
data,
+ * or we have reached the maximum read size of 2^31.
+ * @param first the first range to read
+ * @param minSeekSize minimum size for seek instead of read
+ * @return the last range to read
+ */
+ private static BufferChunk findSingleRead(BufferChunk first, long
minSeekSize) {
BufferChunk last = first;
long currentEnd = first.getEnd();
while (last.next != null &&
!last.next.hasData() &&
- last.next.getOffset() <= currentEnd &&
+ last.next.getOffset() <= (currentEnd + minSeekSize) &&
last.next.getEnd() - first.getOffset() < Integer.MAX_VALUE) {
last = (BufferChunk) last.next;
currentEnd = Math.max(currentEnd, last.getEnd());
@@ -486,18 +503,43 @@ public class RecordReaderUtils {
HadoopShims.ZeroCopyReaderShim zcr,
BufferChunkList list,
boolean doForceDirect) throws IOException {
+ readDiskRanges(file, zcr, list, doForceDirect, 0, 0);
+ }
+
+ /**
+ * Read the list of ranges from the file by updating each range in the list
+ * with a buffer that has the bytes from the file.
+ *
+ * The ranges must be sorted, but may overlap or include holes.
+ *
+ * @param file the file to read
+ * @param zcr the zero copy shim
+ * @param list the disk ranges within the file to read
+ * @param doForceDirect allocate direct buffers
+ * @param minSeekSize the minimum gap to prefer seek vs read
+ * @param minSeekSizeTolerance allowed tolerance for extra bytes in memory
as a result of
+ * minSeekSize
+ */
+ private static void readDiskRanges(FSDataInputStream file,
+ HadoopShims.ZeroCopyReaderShim zcr,
+ BufferChunkList list,
+ boolean doForceDirect,
+ int minSeekSize,
+ double minSeekSizeTolerance) throws IOException {
BufferChunk current = list == null ? null : list.get();
while (current != null) {
while (current.hasData()) {
current = (BufferChunk) current.next;
}
- BufferChunk last = findSingleRead(current);
if (zcr != null) {
+ BufferChunk last = findSingleRead(current);
zeroCopyReadRanges(file, zcr, current, last, doForceDirect);
+ current = (BufferChunk) last.next;
} else {
- readRanges(file, current, last, doForceDirect);
+ ChunkReader chunkReader = ChunkReader.create(current, minSeekSize);
+ chunkReader.readRanges(file, doForceDirect, minSeekSizeTolerance);
+ current = (BufferChunk) chunkReader.to.next;
}
- current = (BufferChunk) last.next;
}
}
@@ -584,4 +626,168 @@ public class RecordReaderUtils {
} while (tree.putIfAbsent(key, buffer) != null);
}
}
+
+ static class ChunkReader {
+ private final BufferChunk from;
+ private final BufferChunk to;
+ private final int readBytes;
+ private final int reqBytes;
+
+ private ChunkReader(BufferChunk from, BufferChunk to, int readSize, int
reqBytes) {
+ this.from = from;
+ this.to = to;
+ this.readBytes = readSize;
+ this.reqBytes = reqBytes;
+ }
+
+ double getExtraBytesFraction() {
+ return (readBytes - reqBytes) / ((double) reqBytes);
+ }
+
+ public int getReadBytes() {
+ return readBytes;
+ }
+
+ public int getReqBytes() {
+ return reqBytes;
+ }
+
+ public BufferChunk getFrom() {
+ return from;
+ }
+
+ public BufferChunk getTo() {
+ return to;
+ }
+
+ void populateChunks(ByteBuffer bytes, boolean allocateDirect, double
extraByteTolerance) {
+ if (getExtraBytesFraction() > extraByteTolerance) {
+ LOG.debug("ExtraBytesFraction = {}, ExtraByteTolerance = {}, reducing
memory size",
+ getExtraBytesFraction(),
+ extraByteTolerance);
+ populateChunksReduceSize(bytes, allocateDirect);
+ } else {
+ LOG.debug("ExtraBytesFraction = {}, ExtraByteTolerance = {},
populating as is",
+ getExtraBytesFraction(),
+ extraByteTolerance);
+ populateChunksAsIs(bytes);
+ }
+ }
+
+ void populateChunksAsIs(ByteBuffer bytes) {
+ // populate each BufferChunks with the data
+ BufferChunk current = from;
+ long offset = from.getOffset();
+ while (current != to.next) {
+ ByteBuffer currentBytes = current == to ? bytes : bytes.duplicate();
+ currentBytes.position((int) (current.getOffset() - offset));
+ currentBytes.limit((int) (current.getEnd() - offset));
+ current.setChunk(currentBytes);
+ current = (BufferChunk) current.next;
+ }
+ }
+
+ void populateChunksReduceSize(ByteBuffer bytes, boolean allocateDirect) {
+ ByteBuffer newBuffer;
+ if (allocateDirect) {
+ newBuffer = ByteBuffer.allocateDirect(reqBytes);
+ newBuffer.position(reqBytes);
+ newBuffer.flip();
+ } else {
+ byte[] newBytes = new byte[reqBytes];
+ newBuffer = ByteBuffer.wrap(newBytes);
+ }
+
+ final long offset = from.getOffset();
+ int copyStart = 0;
+ int copyEnd;
+ int copyLength;
+ int skippedBytes = 0;
+ int srcPosition;
+ BufferChunk current = from;
+ while (current != to.next) {
+ // We can skip bytes as required, but no need to copy bytes that are
already copied
+ srcPosition = (int) (current.getOffset() - offset);
+ skippedBytes += Math.max(0, srcPosition - copyStart);
+ copyStart = Math.max(copyStart, srcPosition);
+ copyEnd = (int) (current.getEnd() - offset);
+ copyLength = copyStart < copyEnd ? copyEnd - copyStart : 0;
+ newBuffer.put(bytes.array(), copyStart, copyLength);
+ copyStart += copyLength;
+ // Set up new ByteBuffer that wraps on the same backing array
+ ByteBuffer currentBytes = current == to ? newBuffer :
newBuffer.duplicate();
+ currentBytes.position(srcPosition - skippedBytes);
+ currentBytes.limit(currentBytes.position() + current.getLength());
+ current.setChunk(currentBytes);
+ current = (BufferChunk) current.next;
+ }
+ }
+
+ /**
+ * Read the data from the file based on a list of ranges in a single read.
+ *
+ * @param file the file to read from
+ * @param allocateDirect should we use direct buffers
+ */
+ void readRanges(FSDataInputStream file, boolean allocateDirect, double
extraByteTolerance)
+ throws IOException {
+ // assume that the chunks are sorted by offset
+ long offset = from.getOffset();
+ int readSize = (int) (computeEnd(from, to) - offset);
+ byte[] buffer = new byte[readSize];
+ try {
+ file.readFully(offset, buffer, 0, buffer.length);
+ } catch (IOException e) {
+ throw new IOException(String.format("Failed while reading %s %d:%d",
+ file,
+ offset,
+ buffer.length),
+ e);
+ }
+
+ // get the data into a ByteBuffer
+ ByteBuffer bytes;
+ if (allocateDirect) {
+ bytes = ByteBuffer.allocateDirect(readSize);
+ bytes.put(buffer);
+ bytes.flip();
+ } else {
+ bytes = ByteBuffer.wrap(buffer);
+ }
+
+ // populate each BufferChunks with the data
+ populateChunks(bytes, allocateDirect, extraByteTolerance);
+ }
+
+ static ChunkReader create(BufferChunk from, BufferChunk to) {
+ long f = Integer.MAX_VALUE;
+ long e = Integer.MIN_VALUE;
+
+ long cf = Integer.MAX_VALUE;
+ long ef = Integer.MIN_VALUE;
+ int reqBytes = 0;
+
+ BufferChunk current = from;
+ while (current != to.next) {
+ f = Math.min(f, current.getOffset());
+ e = Math.max(e, current.getEnd());
+ if (ef == Integer.MIN_VALUE || current.getOffset() <= ef) {
+ cf = Math.min(cf, current.getOffset());
+ ef = Math.max(ef, current.getEnd());
+ } else {
+ reqBytes += ef - cf;
+ cf = current.getOffset();
+ ef = current.getEnd();
+ }
+ current = (BufferChunk) current.next;
+ }
+ reqBytes += ef - cf;
+ return new ChunkReader(from, to, (int) (e - f), reqBytes);
+ }
+
+ static ChunkReader create(BufferChunk from, int minSeekSize) {
+ BufferChunk to = findSingleRead(from, minSeekSize);
+ return create(from, to);
+ }
+ }
}
diff --git a/java/core/src/test/org/apache/orc/TestMinSeekSize.java
b/java/core/src/test/org/apache/orc/TestMinSeekSize.java
new file mode 100644
index 000000000..20f26f0d9
--- /dev/null
+++ b/java/core/src/test/org/apache/orc/TestMinSeekSize.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.Random;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestMinSeekSize {
+ private static final Logger LOG =
LoggerFactory.getLogger(TestMinSeekSize.class);
+ private static final Path workDir = new
Path(System.getProperty("test.tmp.dir",
+ "target" +
File.separator + "test"
+ +
File.separator + "tmp"));
+ private static final Path filePath = new Path(workDir,
"min_seek_size_file.orc");
+ private static Configuration conf;
+ private static FileSystem fs;
+
+ private static final TypeDescription schema = TypeDescription.createStruct()
+ .addField("f1", TypeDescription.createLong())
+ .addField("f2",
TypeDescription.createDecimal().withPrecision(20).withScale(6))
+ .addField("f3", TypeDescription.createLong())
+ .addField("f4", TypeDescription.createString())
+ .addField("ridx", TypeDescription.createLong());
+ private static final boolean[] AlternateColumns = new boolean[] {true, true,
false, true, false
+ , true};
+ private static final long RowCount = 16384;
+ private static final int scale = 3;
+
+ @BeforeAll
+ public static void setup() throws IOException {
+ conf = new Configuration();
+ fs = FileSystem.get(conf);
+
+ LOG.info("Creating file {} with schema {}", filePath, schema);
+ try (Writer writer = OrcFile.createWriter(filePath,
+ OrcFile.writerOptions(conf)
+ .fileSystem(fs)
+ .overwrite(true)
+ .rowIndexStride(8192)
+ .setSchema(schema))) {
+ Random rnd = new Random(1024);
+ VectorizedRowBatch b = schema.createRowBatch();
+ for (int rowIdx = 0; rowIdx < RowCount; rowIdx++) {
+ long v = rnd.nextLong();
+ for (int colIdx = 0; colIdx < schema.getChildren().size() - 1;
colIdx++) {
+ switch (schema.getChildren().get(colIdx).getCategory()) {
+ case LONG:
+ ((LongColumnVector) b.cols[colIdx]).vector[b.size] = v;
+ break;
+ case DECIMAL:
+ HiveDecimalWritable d = new HiveDecimalWritable();
+ d.setFromLongAndScale(v, scale);
+ ((DecimalColumnVector) b.cols[colIdx]).vector[b.size] = d;
+ break;
+ case STRING:
+ ((BytesColumnVector) b.cols[colIdx]).setVal(b.size,
+ String.valueOf(v)
+
.getBytes(StandardCharsets.UTF_8));
+ break;
+ default:
+ throw new IllegalArgumentException();
+ }
+ }
+ // Populate the rowIdx
+ ((LongColumnVector) b.cols[4]).vector[b.size] = rowIdx;
+
+ b.size += 1;
+ if (b.size == b.getMaxSize()) {
+ writer.addRowBatch(b);
+ b.reset();
+ }
+ }
+ if (b.size > 0) {
+ writer.addRowBatch(b);
+ b.reset();
+ }
+ }
+ LOG.info("Created file {}", filePath);
+ }
+
+ @Test
+ public void writeIsSuccessful() throws IOException {
+ Reader r = OrcFile.createReader(filePath,
OrcFile.readerOptions(conf).filesystem(fs));
+ assertEquals(RowCount, r.getNumberOfRows());
+ }
+
+ private long validateFilteredRecordReader(RecordReader rr,
VectorizedRowBatch b)
+ throws IOException {
+ long rowCount = 0;
+ while (rr.nextBatch(b)) {
+ validateBatch(b, rowCount);
+ rowCount += b.size;
+ }
+ return rowCount;
+ }
+
+ private void validateColumnNull(VectorizedRowBatch b, int colIdx) {
+ assertFalse(b.cols[colIdx].noNulls);
+ assertTrue(b.cols[colIdx].isRepeating);
+ assertTrue(b.cols[colIdx].isNull[0]);
+ }
+
+ private void validateBatch(VectorizedRowBatch b, long expRowNum) {
+ HiveDecimalWritable d = new HiveDecimalWritable();
+ validateColumnNull(b, 1);
+ validateColumnNull(b, 3);
+ for (int i = 0; i < b.size; i++) {
+ int rowIdx;
+ if (b.selectedInUse) {
+ rowIdx = b.selected[i];
+ } else {
+ rowIdx = i;
+ }
+ long expValue = ((LongColumnVector) b.cols[0]).vector[rowIdx];
+ d.setFromLongAndScale(expValue, scale);
+ assertEquals(expValue, ((LongColumnVector) b.cols[2]).vector[rowIdx]);
+ if (expRowNum != -1) {
+ assertEquals(expRowNum + i, ((LongColumnVector)
b.cols[4]).vector[rowIdx]);
+ }
+ }
+ }
+
+ @Test
+ public void readAlternateColumnsWOMinSeekSize() throws IOException {
+ readStart();
+ OrcConf.ORC_MIN_DISK_SEEK_SIZE.setInt(conf, 0);
+ Reader r = OrcFile.createReader(filePath,
OrcFile.readerOptions(conf).filesystem(fs));
+ Reader.Options opts = r.options().include(AlternateColumns);
+ VectorizedRowBatch b = schema.createRowBatch();
+ long rowCount;
+ try (RecordReader rr = r.rows(opts)) {
+ rowCount = validateFilteredRecordReader(rr, b);
+ }
+ FileSystem.Statistics stats = readEnd();
+ double p = readPercentage(stats, fs.getFileStatus(filePath).getLen());
+ assertEquals(RowCount, rowCount);
+ assertTrue(p < 60);
+ }
+
+ @Test
+ public void readAlternateColumnsWMinSeekSize() throws IOException {
+ readStart();
+ OrcConf.ORC_MIN_DISK_SEEK_SIZE.setInt(conf, 1024 * 1024);
+ Reader r = OrcFile.createReader(filePath,
OrcFile.readerOptions(conf).filesystem(fs));
+ Reader.Options opts = r.options().include(AlternateColumns);
+ assertEquals(opts.minSeekSize(), 1024 * 1024);
+ VectorizedRowBatch b = schema.createRowBatch();
+ long rowCount;
+ try (RecordReader rr = r.rows(opts)) {
+ rowCount = validateFilteredRecordReader(rr, b);
+ }
+ FileSystem.Statistics stats = readEnd();
+ double p = readPercentage(stats, fs.getFileStatus(filePath).getLen());
+ assertEquals(RowCount, rowCount);
+ // Read all bytes
+ assertTrue(p >= 100);
+ }
+
+ private double readPercentage(FileSystem.Statistics stats, long fileSize) {
+ double p = stats.getBytesRead() * 100.0 / fileSize;
+ LOG.info(String.format("%nFileSize: %d%nReadSize: %d%nRead %%: %.2f",
+ fileSize,
+ stats.getBytesRead(),
+ p));
+ return p;
+ }
+
+ private static void readStart() {
+ FileSystem.clearStatistics();
+ }
+
+ private static FileSystem.Statistics readEnd() {
+ return FileSystem.getAllStatistics().get(0);
+ }
+}
diff --git a/java/core/src/test/org/apache/orc/impl/TestRecordReaderUtils.java
b/java/core/src/test/org/apache/orc/impl/TestRecordReaderUtils.java
new file mode 100644
index 000000000..08890b37d
--- /dev/null
+++ b/java/core/src/test/org/apache/orc/impl/TestRecordReaderUtils.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.orc.impl;
+
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+class TestRecordReaderUtils {
+
+ private final BufferChunkList rangeList = new
TestOrcLargeStripe.RangeBuilder()
+ .range(1000, 1000)
+ .range(2000, 1000)
+ .range(4000, 1000)
+ .range(4100, 100)
+ .range(8000, 1000).build();
+
+ private static void assertChunkEquals(BufferChunk expected, BufferChunk
actual) {
+ assertTrue(Objects.equals(expected, actual)
+ && expected.getOffset() == actual.getOffset()
+ && expected.getLength() == actual.getLength());
+ }
+
+ @Test
+ public void testDeterminationOfSingleRead() {
+ BufferChunk toChunk =
RecordReaderUtils.ChunkReader.create(rangeList.get(), 0).getTo();
+ assertChunkEquals(rangeList.get(1), toChunk);
+ assertTrue(RecordReaderUtils.ChunkReader.create(rangeList.get(), toChunk)
+ .getExtraBytesFraction()
+ < 0.001);
+
+ toChunk = RecordReaderUtils.ChunkReader.create(rangeList.get(),
1000).getTo();
+ assertChunkEquals(rangeList.get(3), toChunk);
+ assertTrue(RecordReaderUtils.ChunkReader.create(rangeList.get(), toChunk)
+ .getExtraBytesFraction()
+ >= .2);
+
+ toChunk = RecordReaderUtils.ChunkReader.create(rangeList.get(),
999).getTo();
+ assertChunkEquals(rangeList.get(1), toChunk);
+ assertTrue(RecordReaderUtils.ChunkReader.create(rangeList.get(), toChunk)
+ .getExtraBytesFraction()
+ < 0.001);
+ }
+
+ @Test
+ public void testNoGapCombine() {
+ BufferChunk toChunk = RecordReaderUtils.findSingleRead(rangeList.get());
+ assertChunkEquals(rangeList.get(1), toChunk);
+ }
+
+ @Test
+ public void testReadExtraBytes() {
+ RecordReaderUtils.ChunkReader chunkReader =
+ RecordReaderUtils.ChunkReader.create(rangeList.get(),
+ 1000);
+ assertChunkEquals(rangeList.get(3), chunkReader.getTo());
+ populateAndValidateChunks(chunkReader, false);
+ }
+
+ @Test
+ public void testRemoveBytes() {
+ RecordReaderUtils.ChunkReader chunkReader =
+ RecordReaderUtils.ChunkReader.create(rangeList.get(),
+ 1000);
+ assertChunkEquals(rangeList.get(3), chunkReader.getTo());
+ populateAndValidateChunks(chunkReader, true);
+ }
+
+ @Test
+ public void testRemoveBytesSmallerOverlapFirst() {
+ BufferChunkList rangeList = new TestOrcLargeStripe.RangeBuilder()
+ .range(1000, 1000)
+ .range(2000, 1000)
+ .range(4000, 100)
+ .range(4000, 1000)
+ .range(8000, 1000).build();
+ RecordReaderUtils.ChunkReader chunkReader =
+ RecordReaderUtils.ChunkReader.create(rangeList.get(),
+ 1000);
+ assertChunkEquals(rangeList.get(3), chunkReader.getTo());
+ populateAndValidateChunks(chunkReader, true);
+ }
+
+ @Test
+ public void testRemoveBytesWithOverlap() {
+ BufferChunkList rangeList = new TestOrcLargeStripe.RangeBuilder()
+ .range(1000, 1000)
+ .range(1800, 400)
+ .range(2000, 1000)
+ .range(4000, 100)
+ .range(4000, 1000)
+ .range(8000, 1000).build();
+ RecordReaderUtils.ChunkReader chunkReader =
+ RecordReaderUtils.ChunkReader.create(rangeList.get(),
+ 1000);
+ assertChunkEquals(rangeList.get(4), chunkReader.getTo());
+ populateAndValidateChunks(chunkReader, true);
+ }
+
+ @Test
+ public void testExtraBytesReadWithinThreshold() {
+ BufferChunkList rangeList = new TestOrcLargeStripe.RangeBuilder()
+ .range(1000, 1000)
+ .range(1800, 400)
+ .range(2000, 1000)
+ .range(4000, 100)
+ .range(4000, 1000)
+ .range(8000, 1000).build();
+ RecordReaderUtils.ChunkReader chunkReader =
+ RecordReaderUtils.ChunkReader.create(rangeList.get(),
+ 1000);
+ assertChunkEquals(rangeList.get(4), chunkReader.getTo());
+ chunkReader.populateChunks(makeByteBuffer(chunkReader.getReadBytes(),
+
chunkReader.getFrom().getOffset()),
+ false,
+ 1.0);
+ validateChunks(chunkReader);
+ assertNotEquals(chunkReader.getReadBytes(), chunkReader.getReqBytes());
+ assertEquals(chunkReader.getReadBytes(),
chunkReader.getFrom().getData().array().length);
+ }
+
+ private ByteBuffer makeByteBuffer(int length, long offset) {
+ byte[] readBytes = new byte[length];
+ for (int i = 0; i < readBytes.length; i++) {
+ readBytes[i] = (byte) ((i + offset) % Byte.MAX_VALUE);
+ }
+ return ByteBuffer.wrap(readBytes);
+ }
+
+ private void populateAndValidateChunks(RecordReaderUtils.ChunkReader
chunkReader,
+ boolean withRemove) {
+ if (withRemove) {
+ assertTrue(chunkReader.getReadBytes() > chunkReader.getReqBytes());
+ }
+ ByteBuffer bytes = makeByteBuffer(chunkReader.getReadBytes(),
+ chunkReader.getFrom().getOffset());
+ if (withRemove) {
+ chunkReader.populateChunksReduceSize(bytes, false);
+ assertEquals(chunkReader.getReqBytes(),
chunkReader.getFrom().getData().array().length);
+ } else {
+ chunkReader.populateChunksAsIs(bytes);
+ assertEquals(chunkReader.getReadBytes(),
chunkReader.getFrom().getData().array().length);
+ }
+
+ validateChunks(chunkReader);
+ }
+
+ private void validateChunks(RecordReaderUtils.ChunkReader chunkReader) {
+ BufferChunk current = chunkReader.getFrom();
+ while (current != chunkReader.getTo().next) {
+ assertTrue(current.hasData());
+ assertEquals(current.getOffset() % Byte.MAX_VALUE,
current.getData().get(),
+ String.format("Failed for %s", current));
+ current = (BufferChunk) current.next;
+ }
+ }
+}