This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git
The following commit(s) were added to refs/heads/main by this push:
new f34d9145e ORC-1167: Support orc.row.batch.size configuration (#1108)
f34d9145e is described below
commit f34d9145edb0853e304670a0119b0ba7457b7741
Author: sychen <[email protected]>
AuthorDate: Sat May 7 20:04:10 2022 +0800
ORC-1167: Support orc.row.batch.size configuration (#1108)
Support `orc.row.batch.size` configuration in `Reader.Options`.
Now create `OrcMapreduceRecordReader`, the default value of batch size is
1024.
If we read 1024 relatively large strings, we might get
`NegativeArraySizeException`, but no configuration to reduce batch size.
```
java.lang.NegativeArraySizeException
at
org.apache.orc.impl.TreeReaderFactory$BytesColumnVectorUtil.commonReadByteArrays(TreeReaderFactory.java:1544)
at
org.apache.orc.impl.TreeReaderFactory$BytesColumnVectorUtil.readOrcByteArrays(TreeReaderFactory.java:1566)
at
org.apache.orc.impl.TreeReaderFactory$StringDirectTreeReader.nextVector(TreeReaderFactory.java:1662)
at
org.apache.orc.impl.TreeReaderFactory$StringTreeReader.nextVector(TreeReaderFactory.java:1508)
at
org.apache.orc.impl.TreeReaderFactory$StructTreeReader.nextBatch(TreeReaderFactory.java:2047)
at
org.apache.orc.impl.RecordReaderImpl.nextBatch(RecordReaderImpl.java:1219)
at
org.apache.orc.mapreduce.OrcMapreduceRecordReader.ensureBatch(OrcMapreduceRecordReader.java:84)
at
org.apache.orc.mapreduce.OrcMapreduceRecordReader.nextKeyValue(OrcMapreduceRecordReader.java:102)
at
org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
```
- Add UT
- Production environment verification
Closes #1108
Signed-off-by: Dongjoon Hyun <[email protected]>
---
java/core/src/java/org/apache/orc/OrcConf.java | 5 +-
java/core/src/java/org/apache/orc/Reader.java | 17 +++++
.../org/apache/orc/impl/TreeReaderFactory.java | 14 +++-
.../orc/impl/TestConvertTreeReaderFactory.java | 38 +++++++++++
.../org/apache/orc/impl/TestOrcLargeStripe.java | 74 ++++++++++++++++++++++
.../apache/orc/mapred/OrcMapredRecordReader.java | 2 +-
.../orc/mapreduce/OrcMapreduceRecordReader.java | 2 +-
.../org/apache/orc/mapred/TestOrcOutputFormat.java | 4 +-
.../mapreduce/TestMapreduceOrcOutputFormat.java | 4 +-
9 files changed, 154 insertions(+), 6 deletions(-)
diff --git a/java/core/src/java/org/apache/orc/OrcConf.java
b/java/core/src/java/org/apache/orc/OrcConf.java
index 25bd8b973..e623080bd 100644
--- a/java/core/src/java/org/apache/orc/OrcConf.java
+++ b/java/core/src/java/org/apache/orc/OrcConf.java
@@ -219,7 +219,10 @@ public enum OrcConf {
"orc.proleptic.gregorian.default", false,
"This value controls whether pre-ORC 27 files are using the hybrid or
proleptic\n" +
"calendar. Only Hive 3.1 and the C++ library wrote using the proleptic,
so hybrid\n" +
- "is the default.")
+ "is the default."),
+ ROW_BATCH_SIZE("orc.row.batch.size", "orc.row.batch.size", 1024,
+ "The number of rows to include in a orc vectorized reader batch. " +
+ "The value should be carefully chosen to minimize overhead and avoid
OOMs in reading data.")
;
private final String attribute;
diff --git a/java/core/src/java/org/apache/orc/Reader.java
b/java/core/src/java/org/apache/orc/Reader.java
index 56b14393a..eb1515bfe 100644
--- a/java/core/src/java/org/apache/orc/Reader.java
+++ b/java/core/src/java/org/apache/orc/Reader.java
@@ -237,6 +237,7 @@ public interface Reader extends Closeable {
private int minSeekSize = (int)
OrcConf.ORC_MIN_DISK_SEEK_SIZE.getDefaultValue();
private double minSeekSizeTolerance = (double)
OrcConf.ORC_MIN_DISK_SEEK_SIZE_TOLERANCE
.getDefaultValue();
+ private int rowBatchSize = (int) OrcConf.ROW_BATCH_SIZE.getDefaultValue();
/**
* @since 1.1.0
@@ -261,6 +262,7 @@ public interface Reader extends Closeable {
allowPluginFilters = OrcConf.ALLOW_PLUGIN_FILTER.getBoolean(conf);
minSeekSize = OrcConf.ORC_MIN_DISK_SEEK_SIZE.getInt(conf);
minSeekSizeTolerance =
OrcConf.ORC_MIN_DISK_SEEK_SIZE_TOLERANCE.getDouble(conf);
+ rowBatchSize = OrcConf.ROW_BATCH_SIZE.getInt(conf);
}
/**
@@ -683,6 +685,21 @@ public interface Reader extends Closeable {
this.minSeekSizeTolerance = value;
return this;
}
+
+ /**
+ * @since 1.9.0
+ */
+ public int getRowBatchSize() {
+ return rowBatchSize;
+ }
+
+ /**
+ * @since 1.9.0
+ */
+ public Options rowBatchSize(int value) {
+ this.rowBatchSize = value;
+ return this;
+ }
}
/**
diff --git a/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
b/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
index f2ff95ac2..a639e3181 100644
--- a/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
+++ b/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
@@ -34,6 +34,7 @@ import
org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
import org.apache.hadoop.hive.ql.io.filter.FilterContext;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.orc.OrcConf;
import org.apache.orc.OrcFile;
import org.apache.orc.OrcFilterContext;
import org.apache.orc.OrcProto;
@@ -1994,7 +1995,18 @@ public class TreeReaderFactory {
totalLength = (int) (batchSize * scratchlcv.vector[0]);
}
}
-
+ if (totalLength < 0) {
+ StringBuilder sb = new StringBuilder("totalLength:" + totalLength
+ + " is a negative number.");
+ if (batchSize > 1) {
+ sb.append(" The current batch size is ");
+ sb.append(batchSize);
+ sb.append(", you can reduce the value by '");
+ sb.append(OrcConf.ROW_BATCH_SIZE.getAttribute());
+ sb.append("'.");
+ }
+ throw new IOException(sb.toString());
+ }
// Read all the strings for this batch
byte[] allBytes = new byte[totalLength];
int offset = 0;
diff --git
a/java/core/src/test/org/apache/orc/impl/TestConvertTreeReaderFactory.java
b/java/core/src/test/org/apache/orc/impl/TestConvertTreeReaderFactory.java
index 684d26039..1644210c8 100644
--- a/java/core/src/test/org/apache/orc/impl/TestConvertTreeReaderFactory.java
+++ b/java/core/src/test/org/apache/orc/impl/TestConvertTreeReaderFactory.java
@@ -51,7 +51,9 @@ import java.util.GregorianCalendar;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
public class TestConvertTreeReaderFactory {
@@ -349,4 +351,40 @@ public class TestConvertTreeReaderFactory {
fs.delete(testFilePath, false);
}
}
+
+ @Test
+ public void testReadOrcByteArraysException() {
+ InStream stream = mock(InStream.class);
+ RunLengthIntegerReaderV2 lengths = mock(RunLengthIntegerReaderV2.class);
+ int batchSize = 1024;
+ LongColumnVector defaultBatchSizeScratchlcv = new
LongColumnVector(batchSize);
+ for (int i = 0; i < batchSize; i++) {
+ defaultBatchSizeScratchlcv.vector[i] = Integer.MAX_VALUE - 8;
+ }
+
+ BytesColumnVector defaultBatchSizeResult = new
BytesColumnVector(batchSize);
+ IOException defaultBatchSizeException = assertThrows(
+ IOException.class,
+ () ->
TreeReaderFactory.BytesColumnVectorUtil.readOrcByteArrays(stream, lengths,
+ defaultBatchSizeScratchlcv, defaultBatchSizeResult,
batchSize));
+
+ assertEquals("totalLength:-9216 is a negative number. " +
+ "The current batch size is 1024, " +
+ "you can reduce the value by 'orc.row.batch.size'.",
+ defaultBatchSizeException.getMessage());
+
+ int batchSizeOne = 1;
+ LongColumnVector batchSizeOneScratchlcv = new
LongColumnVector(batchSizeOne);
+ for (int i = 0; i < batchSizeOne; i++) {
+ batchSizeOneScratchlcv.vector[i] = Long.MAX_VALUE;
+ }
+ BytesColumnVector batchSizeOneResult = new BytesColumnVector(batchSizeOne);
+ IOException batchSizeOneException = assertThrows(
+ IOException.class,
+ () ->
TreeReaderFactory.BytesColumnVectorUtil.readOrcByteArrays(stream, lengths,
+ batchSizeOneScratchlcv, batchSizeOneResult, batchSizeOne));
+
+ assertEquals("totalLength:-1 is a negative number.",
+ batchSizeOneException.getMessage());
+ }
}
diff --git a/java/core/src/test/org/apache/orc/impl/TestOrcLargeStripe.java
b/java/core/src/test/org/apache/orc/impl/TestOrcLargeStripe.java
index 10e0ebb78..97e9dc73d 100644
--- a/java/core/src/test/org/apache/orc/impl/TestOrcLargeStripe.java
+++ b/java/core/src/test/org/apache/orc/impl/TestOrcLargeStripe.java
@@ -42,6 +42,7 @@ import java.nio.charset.StandardCharsets;
import java.util.Random;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
import static org.mockito.ArgumentMatchers.any;
@@ -214,4 +215,77 @@ public class TestOrcLargeStripe {
fs.delete(testFilePath, false);
}
}
+
+ @Test
+ public void testAdjustRowBatchSizeWhenReadLargeString() throws IOException {
+ final Runtime rt = Runtime.getRuntime();
+ assumeTrue(rt.maxMemory() > 4_000_000_000L);
+ TypeDescription schema = TypeDescription.createString();
+
+ conf.setDouble("hive.exec.orc.dictionary.key.size.threshold", 0.0);
+ Writer writer = OrcFile.createWriter(
+ testFilePath,
+ OrcFile.writerOptions(conf).setSchema(schema)
+ .compress(CompressionKind.NONE));
+ // default batch size
+ int size = 1024;
+ int width = Integer.MAX_VALUE / 1000;
+
+ // generate a random string that is width characters long
+ Random random = new Random(123);
+ char[] randomChars= new char[width];
+ int posn = 0;
+ for(int length = 0; length < width && posn < randomChars.length; ++posn) {
+ char cp = (char) random.nextInt(Character.MIN_SUPPLEMENTARY_CODE_POINT);
+ // make sure we get a valid, non-surrogate
+ while (Character.isSurrogate(cp)) {
+ cp = (char) random.nextInt(Character.MIN_SUPPLEMENTARY_CODE_POINT);
+ }
+ // compute the length of the utf8
+ length += cp < 0x80 ? 1 : (cp < 0x800 ? 2 : 3);
+ randomChars[posn] = cp;
+ }
+
+ // put the random characters in as a repeating value.
+ VectorizedRowBatch batch = schema.createRowBatch();
+ BytesColumnVector string = (BytesColumnVector) batch.cols[0];
+ string.setVal(0, new String(randomChars, 0,
posn).getBytes(StandardCharsets.UTF_8));
+ string.isRepeating = true;
+ for(int rows=size; rows > 0; rows -= batch.size) {
+ batch.size = Math.min(rows, batch.getMaxSize());
+ writer.addRowBatch(batch);
+ }
+ writer.close();
+
+ // default batch size
+ IOException exception = assertThrows(
+ IOException.class,
+ () -> {
+ try (Reader reader = OrcFile.createReader(testFilePath,
+ OrcFile.readerOptions(conf).filesystem(fs))) {
+ RecordReader rows = reader.rows();
+ rows.nextBatch(reader.getSchema().createRowBatch());
+ }
+ }
+ );
+ assertEquals("totalLength:-2095944704 is a negative number. " +
+ "The current batch size is 1024, " +
+ "you can reduce the value by 'orc.row.batch.size'.",
+ exception.getCause().getMessage());
+
+ try {
+ Reader reader = OrcFile.createReader(testFilePath,
+ OrcFile.readerOptions(conf).filesystem(fs));
+ RecordReader rows = reader.rows();
+ // Modify RowBatchMaxSize to reduce from 1024 to 2
+ batch = reader.getSchema().createRowBatch(2);
+ int rowsRead = 0;
+ while (rows.nextBatch(batch)) {
+ rowsRead += batch.size;
+ }
+ assertEquals(size, rowsRead);
+ } finally {
+ fs.delete(testFilePath, false);
+ }
+ }
}
diff --git
a/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordReader.java
b/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordReader.java
index 7e6ba3887..4230b8f95 100644
--- a/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordReader.java
+++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordReader.java
@@ -70,7 +70,7 @@ public class OrcMapredRecordReader<V extends
WritableComparable>
protected OrcMapredRecordReader(Reader fileReader,
Reader.Options options) throws IOException {
- this(fileReader, options, VectorizedRowBatch.DEFAULT_SIZE);
+ this(fileReader, options, options.getRowBatchSize());
}
protected OrcMapredRecordReader(Reader fileReader,
diff --git
a/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordReader.java
b/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordReader.java
index d2937dc51..b4f37f5a9 100644
---
a/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordReader.java
+++
b/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordReader.java
@@ -56,7 +56,7 @@ public class OrcMapreduceRecordReader<V extends
WritableComparable>
public OrcMapreduceRecordReader(Reader fileReader,
Reader.Options options) throws IOException {
- this(fileReader, options, VectorizedRowBatch.DEFAULT_SIZE);
+ this(fileReader, options, options.getRowBatchSize());
}
public OrcMapreduceRecordReader(Reader fileReader,
diff --git
a/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcOutputFormat.java
b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcOutputFormat.java
index 00040ad7e..64799177a 100644
--- a/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcOutputFormat.java
+++ b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcOutputFormat.java
@@ -297,6 +297,7 @@ public class TestOrcOutputFormat {
writer.close(Reporter.NULL);
Path path = new Path(workDir, "value.orc");
Reader file = OrcFile.createReader(path, OrcFile.readerOptions(conf));
+ assertEquals(OrcConf.ROW_BATCH_SIZE.getDefaultValue(),
file.options().getRowBatchSize());
assertEquals(3000, file.getNumberOfRows());
assertEquals(TYPE_STRING, file.getSchema().toString());
}
@@ -327,12 +328,13 @@ public class TestOrcOutputFormat {
writer.close(Reporter.NULL);
Path path = new Path(workDir, "key.orc");
Reader file = OrcFile.createReader(path, OrcFile.readerOptions(conf));
+ assertEquals(128, file.options().getRowBatchSize());
assertEquals(2000, file.getNumberOfRows());
assertEquals(TYPE_STRING, file.getSchema().toString());
}
private static class OrcOutputFormatWithRowBatchSize<V extends Writable>
extends OrcOutputFormat {
- public static final String ROW_BATCH_SIZE = "orc.row.batch.size";
+ public static final String ROW_BATCH_SIZE =
OrcConf.ROW_BATCH_SIZE.getAttribute();
@Override
public RecordWriter<NullWritable, V> getRecordWriter(FileSystem fileSystem,
diff --git
a/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMapreduceOrcOutputFormat.java
b/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMapreduceOrcOutputFormat.java
index 702a73755..897bce668 100644
---
a/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMapreduceOrcOutputFormat.java
+++
b/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMapreduceOrcOutputFormat.java
@@ -283,6 +283,7 @@ public class TestMapreduceOrcOutputFormat {
writer.close(attemptContext);
Path path = new Path(workDir, "part-m-00000.orc");
Reader file = OrcFile.createReader(path, OrcFile.readerOptions(conf));
+ assertEquals(OrcConf.ROW_BATCH_SIZE.getDefaultValue(),
file.options().getRowBatchSize());
assertEquals(3000, file.getNumberOfRows());
assertEquals(TYPE_STRING, file.getSchema().toString());
}
@@ -313,13 +314,14 @@ public class TestMapreduceOrcOutputFormat {
writer.close(attemptContext);
Path path = new Path(workDir, "part-m-00000.orc");
Reader file = OrcFile.createReader(path, OrcFile.readerOptions(conf));
+ assertEquals(128, file.options().getRowBatchSize());
assertEquals(2000, file.getNumberOfRows());
assertEquals(TYPE_STRING, file.getSchema().toString());
}
private static class OrcOutputFormatWithRowBatchSize<V extends Writable>
extends OrcOutputFormat {
private static final String EXTENSION = ".orc";
- public static final String ROW_BATCH_SIZE = "orc.row.batch.size";
+ public static final String ROW_BATCH_SIZE =
OrcConf.ROW_BATCH_SIZE.getAttribute();
@Override
public RecordWriter<NullWritable, V>