This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/orc.git


The following commit(s) were added to refs/heads/main by this push:
     new f34d9145e ORC-1167: Support orc.row.batch.size configuration (#1108)
f34d9145e is described below

commit f34d9145edb0853e304670a0119b0ba7457b7741
Author: sychen <[email protected]>
AuthorDate: Sat May 7 20:04:10 2022 +0800

    ORC-1167: Support orc.row.batch.size configuration (#1108)
    
    Support `orc.row.batch.size` configuration in `Reader.Options`.
    
    Now create `OrcMapreduceRecordReader`, the default value of batch size is 
1024.
    
    If we read 1024 relatively large strings, we might get 
`NegativeArraySizeException`, but no configuration to reduce batch size.
    
    ```
    java.lang.NegativeArraySizeException
            at 
org.apache.orc.impl.TreeReaderFactory$BytesColumnVectorUtil.commonReadByteArrays(TreeReaderFactory.java:1544)
            at 
org.apache.orc.impl.TreeReaderFactory$BytesColumnVectorUtil.readOrcByteArrays(TreeReaderFactory.java:1566)
            at 
org.apache.orc.impl.TreeReaderFactory$StringDirectTreeReader.nextVector(TreeReaderFactory.java:1662)
            at 
org.apache.orc.impl.TreeReaderFactory$StringTreeReader.nextVector(TreeReaderFactory.java:1508)
            at 
org.apache.orc.impl.TreeReaderFactory$StructTreeReader.nextBatch(TreeReaderFactory.java:2047)
            at 
org.apache.orc.impl.RecordReaderImpl.nextBatch(RecordReaderImpl.java:1219)
            at 
org.apache.orc.mapreduce.OrcMapreduceRecordReader.ensureBatch(OrcMapreduceRecordReader.java:84)
            at 
org.apache.orc.mapreduce.OrcMapreduceRecordReader.nextKeyValue(OrcMapreduceRecordReader.java:102)
            at 
org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
    ```
    
    - Add UT
    - Production environment verification
    
    Closes #1108
    
    Signed-off-by: Dongjoon Hyun <[email protected]>
---
 java/core/src/java/org/apache/orc/OrcConf.java     |  5 +-
 java/core/src/java/org/apache/orc/Reader.java      | 17 +++++
 .../org/apache/orc/impl/TreeReaderFactory.java     | 14 +++-
 .../orc/impl/TestConvertTreeReaderFactory.java     | 38 +++++++++++
 .../org/apache/orc/impl/TestOrcLargeStripe.java    | 74 ++++++++++++++++++++++
 .../apache/orc/mapred/OrcMapredRecordReader.java   |  2 +-
 .../orc/mapreduce/OrcMapreduceRecordReader.java    |  2 +-
 .../org/apache/orc/mapred/TestOrcOutputFormat.java |  4 +-
 .../mapreduce/TestMapreduceOrcOutputFormat.java    |  4 +-
 9 files changed, 154 insertions(+), 6 deletions(-)

diff --git a/java/core/src/java/org/apache/orc/OrcConf.java 
b/java/core/src/java/org/apache/orc/OrcConf.java
index 25bd8b973..e623080bd 100644
--- a/java/core/src/java/org/apache/orc/OrcConf.java
+++ b/java/core/src/java/org/apache/orc/OrcConf.java
@@ -219,7 +219,10 @@ public enum OrcConf {
       "orc.proleptic.gregorian.default", false,
       "This value controls whether pre-ORC 27 files are using the hybrid or 
proleptic\n" +
       "calendar. Only Hive 3.1 and the C++ library wrote using the proleptic, 
so hybrid\n" +
-      "is the default.")
+      "is the default."),
+  ROW_BATCH_SIZE("orc.row.batch.size", "orc.row.batch.size", 1024,
+      "The number of rows to include in a orc vectorized reader batch. " +
+      "The value should be carefully chosen to minimize overhead and avoid 
OOMs in reading data.")
   ;
 
   private final String attribute;
diff --git a/java/core/src/java/org/apache/orc/Reader.java 
b/java/core/src/java/org/apache/orc/Reader.java
index 56b14393a..eb1515bfe 100644
--- a/java/core/src/java/org/apache/orc/Reader.java
+++ b/java/core/src/java/org/apache/orc/Reader.java
@@ -237,6 +237,7 @@ public interface Reader extends Closeable {
     private int minSeekSize = (int) 
OrcConf.ORC_MIN_DISK_SEEK_SIZE.getDefaultValue();
     private double minSeekSizeTolerance = (double) 
OrcConf.ORC_MIN_DISK_SEEK_SIZE_TOLERANCE
       .getDefaultValue();
+    private int rowBatchSize = (int) OrcConf.ROW_BATCH_SIZE.getDefaultValue();
 
     /**
      * @since 1.1.0
@@ -261,6 +262,7 @@ public interface Reader extends Closeable {
       allowPluginFilters = OrcConf.ALLOW_PLUGIN_FILTER.getBoolean(conf);
       minSeekSize = OrcConf.ORC_MIN_DISK_SEEK_SIZE.getInt(conf);
       minSeekSizeTolerance = 
OrcConf.ORC_MIN_DISK_SEEK_SIZE_TOLERANCE.getDouble(conf);
+      rowBatchSize = OrcConf.ROW_BATCH_SIZE.getInt(conf);
     }
 
     /**
@@ -683,6 +685,21 @@ public interface Reader extends Closeable {
       this.minSeekSizeTolerance = value;
       return this;
     }
+
+    /**
+     * @since 1.9.0
+     */
+    public int getRowBatchSize() {
+      return rowBatchSize;
+    }
+
+    /**
+     * @since 1.9.0
+     */
+    public Options rowBatchSize(int value) {
+      this.rowBatchSize = value;
+      return this;
+    }
   }
 
   /**
diff --git a/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java 
b/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
index f2ff95ac2..a639e3181 100644
--- a/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
+++ b/java/core/src/java/org/apache/orc/impl/TreeReaderFactory.java
@@ -34,6 +34,7 @@ import 
org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
 import org.apache.hadoop.hive.ql.io.filter.FilterContext;
 import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.orc.OrcConf;
 import org.apache.orc.OrcFile;
 import org.apache.orc.OrcFilterContext;
 import org.apache.orc.OrcProto;
@@ -1994,7 +1995,18 @@ public class TreeReaderFactory {
           totalLength = (int) (batchSize * scratchlcv.vector[0]);
         }
       }
-
+      if (totalLength < 0) {
+        StringBuilder sb = new StringBuilder("totalLength:" + totalLength
+                + " is a negative number.");
+        if (batchSize > 1) {
+          sb.append(" The current batch size is ");
+          sb.append(batchSize);
+          sb.append(", you can reduce the value by '");
+          sb.append(OrcConf.ROW_BATCH_SIZE.getAttribute());
+          sb.append("'.");
+        }
+        throw new IOException(sb.toString());
+      }
       // Read all the strings for this batch
       byte[] allBytes = new byte[totalLength];
       int offset = 0;
diff --git 
a/java/core/src/test/org/apache/orc/impl/TestConvertTreeReaderFactory.java 
b/java/core/src/test/org/apache/orc/impl/TestConvertTreeReaderFactory.java
index 684d26039..1644210c8 100644
--- a/java/core/src/test/org/apache/orc/impl/TestConvertTreeReaderFactory.java
+++ b/java/core/src/test/org/apache/orc/impl/TestConvertTreeReaderFactory.java
@@ -51,7 +51,9 @@ import java.util.GregorianCalendar;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
 
 public class TestConvertTreeReaderFactory {
 
@@ -349,4 +351,40 @@ public class TestConvertTreeReaderFactory {
       fs.delete(testFilePath, false);
     }
   }
+
+  @Test
+  public void testReadOrcByteArraysException() {
+    InStream stream = mock(InStream.class);
+    RunLengthIntegerReaderV2 lengths = mock(RunLengthIntegerReaderV2.class);
+    int batchSize = 1024;
+    LongColumnVector defaultBatchSizeScratchlcv = new 
LongColumnVector(batchSize);
+    for (int i = 0; i < batchSize; i++) {
+      defaultBatchSizeScratchlcv.vector[i] = Integer.MAX_VALUE - 8;
+    }
+
+    BytesColumnVector defaultBatchSizeResult = new 
BytesColumnVector(batchSize);
+    IOException defaultBatchSizeException = assertThrows(
+            IOException.class,
+            () -> 
TreeReaderFactory.BytesColumnVectorUtil.readOrcByteArrays(stream, lengths,
+                    defaultBatchSizeScratchlcv, defaultBatchSizeResult, 
batchSize));
+
+    assertEquals("totalLength:-9216 is a negative number. " +
+                    "The current batch size is 1024, " +
+                    "you can reduce the value by 'orc.row.batch.size'.",
+            defaultBatchSizeException.getMessage());
+
+    int batchSizeOne = 1;
+    LongColumnVector batchSizeOneScratchlcv = new 
LongColumnVector(batchSizeOne);
+    for (int i = 0; i < batchSizeOne; i++) {
+      batchSizeOneScratchlcv.vector[i] = Long.MAX_VALUE;
+    }
+    BytesColumnVector batchSizeOneResult = new BytesColumnVector(batchSizeOne);
+    IOException batchSizeOneException = assertThrows(
+            IOException.class,
+            () -> 
TreeReaderFactory.BytesColumnVectorUtil.readOrcByteArrays(stream, lengths,
+                    batchSizeOneScratchlcv, batchSizeOneResult, batchSizeOne));
+
+    assertEquals("totalLength:-1 is a negative number.",
+            batchSizeOneException.getMessage());
+  }
 }
diff --git a/java/core/src/test/org/apache/orc/impl/TestOrcLargeStripe.java 
b/java/core/src/test/org/apache/orc/impl/TestOrcLargeStripe.java
index 10e0ebb78..97e9dc73d 100644
--- a/java/core/src/test/org/apache/orc/impl/TestOrcLargeStripe.java
+++ b/java/core/src/test/org/apache/orc/impl/TestOrcLargeStripe.java
@@ -42,6 +42,7 @@ import java.nio.charset.StandardCharsets;
 import java.util.Random;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assumptions.assumeTrue;
 import static org.mockito.ArgumentMatchers.any;
@@ -214,4 +215,77 @@ public class TestOrcLargeStripe {
       fs.delete(testFilePath, false);
     }
   }
+
+  @Test
+  public void testAdjustRowBatchSizeWhenReadLargeString() throws IOException {
+    final Runtime rt = Runtime.getRuntime();
+    assumeTrue(rt.maxMemory() > 4_000_000_000L);
+    TypeDescription schema = TypeDescription.createString();
+
+    conf.setDouble("hive.exec.orc.dictionary.key.size.threshold", 0.0);
+    Writer writer = OrcFile.createWriter(
+            testFilePath,
+            OrcFile.writerOptions(conf).setSchema(schema)
+                    .compress(CompressionKind.NONE));
+    // default batch size
+    int size = 1024;
+    int width = Integer.MAX_VALUE / 1000;
+
+    // generate a random string that is width characters long
+    Random random = new Random(123);
+    char[] randomChars= new char[width];
+    int posn = 0;
+    for(int length = 0; length < width && posn < randomChars.length; ++posn) {
+      char cp = (char) random.nextInt(Character.MIN_SUPPLEMENTARY_CODE_POINT);
+      // make sure we get a valid, non-surrogate
+      while (Character.isSurrogate(cp)) {
+        cp = (char) random.nextInt(Character.MIN_SUPPLEMENTARY_CODE_POINT);
+      }
+      // compute the length of the utf8
+      length += cp < 0x80 ? 1 : (cp < 0x800 ? 2 : 3);
+      randomChars[posn] = cp;
+    }
+
+    // put the random characters in as a repeating value.
+    VectorizedRowBatch batch = schema.createRowBatch();
+    BytesColumnVector string = (BytesColumnVector) batch.cols[0];
+    string.setVal(0, new String(randomChars, 0, 
posn).getBytes(StandardCharsets.UTF_8));
+    string.isRepeating = true;
+    for(int rows=size; rows > 0; rows -= batch.size) {
+      batch.size = Math.min(rows, batch.getMaxSize());
+      writer.addRowBatch(batch);
+    }
+    writer.close();
+
+    // default batch size
+    IOException exception = assertThrows(
+            IOException.class,
+            () -> {
+              try (Reader reader = OrcFile.createReader(testFilePath,
+                      OrcFile.readerOptions(conf).filesystem(fs))) {
+                RecordReader rows = reader.rows();
+                rows.nextBatch(reader.getSchema().createRowBatch());
+              }
+            }
+    );
+    assertEquals("totalLength:-2095944704 is a negative number. " +
+                    "The current batch size is 1024, " +
+                    "you can reduce the value by 'orc.row.batch.size'.",
+            exception.getCause().getMessage());
+
+    try {
+      Reader reader = OrcFile.createReader(testFilePath,
+              OrcFile.readerOptions(conf).filesystem(fs));
+      RecordReader rows = reader.rows();
+      // Modify RowBatchMaxSize to reduce from 1024 to 2
+      batch = reader.getSchema().createRowBatch(2);
+      int rowsRead = 0;
+      while (rows.nextBatch(batch)) {
+        rowsRead += batch.size;
+      }
+      assertEquals(size, rowsRead);
+    } finally {
+      fs.delete(testFilePath, false);
+    }
+  }
 }
diff --git 
a/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordReader.java 
b/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordReader.java
index 7e6ba3887..4230b8f95 100644
--- a/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordReader.java
+++ b/java/mapreduce/src/java/org/apache/orc/mapred/OrcMapredRecordReader.java
@@ -70,7 +70,7 @@ public class OrcMapredRecordReader<V extends 
WritableComparable>
 
   protected OrcMapredRecordReader(Reader fileReader,
                                   Reader.Options options) throws IOException {
-    this(fileReader, options, VectorizedRowBatch.DEFAULT_SIZE);
+    this(fileReader, options, options.getRowBatchSize());
   }
 
   protected OrcMapredRecordReader(Reader fileReader,
diff --git 
a/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordReader.java
 
b/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordReader.java
index d2937dc51..b4f37f5a9 100644
--- 
a/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordReader.java
+++ 
b/java/mapreduce/src/java/org/apache/orc/mapreduce/OrcMapreduceRecordReader.java
@@ -56,7 +56,7 @@ public class OrcMapreduceRecordReader<V extends 
WritableComparable>
 
   public OrcMapreduceRecordReader(Reader fileReader,
                                   Reader.Options options) throws IOException {
-    this(fileReader, options, VectorizedRowBatch.DEFAULT_SIZE);
+    this(fileReader, options, options.getRowBatchSize());
   }
 
   public OrcMapreduceRecordReader(Reader fileReader,
diff --git 
a/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcOutputFormat.java 
b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcOutputFormat.java
index 00040ad7e..64799177a 100644
--- a/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcOutputFormat.java
+++ b/java/mapreduce/src/test/org/apache/orc/mapred/TestOrcOutputFormat.java
@@ -297,6 +297,7 @@ public class TestOrcOutputFormat {
     writer.close(Reporter.NULL);
     Path path = new Path(workDir, "value.orc");
     Reader file = OrcFile.createReader(path, OrcFile.readerOptions(conf));
+    assertEquals(OrcConf.ROW_BATCH_SIZE.getDefaultValue(), 
file.options().getRowBatchSize());
     assertEquals(3000, file.getNumberOfRows());
     assertEquals(TYPE_STRING, file.getSchema().toString());
   }
@@ -327,12 +328,13 @@ public class TestOrcOutputFormat {
     writer.close(Reporter.NULL);
     Path path = new Path(workDir, "key.orc");
     Reader file = OrcFile.createReader(path, OrcFile.readerOptions(conf));
+    assertEquals(128, file.options().getRowBatchSize());
     assertEquals(2000, file.getNumberOfRows());
     assertEquals(TYPE_STRING, file.getSchema().toString());
   }
 
   private static class OrcOutputFormatWithRowBatchSize<V extends Writable> 
extends OrcOutputFormat {
-    public static final String ROW_BATCH_SIZE = "orc.row.batch.size";
+    public static final String ROW_BATCH_SIZE = 
OrcConf.ROW_BATCH_SIZE.getAttribute();
 
     @Override
     public RecordWriter<NullWritable, V> getRecordWriter(FileSystem fileSystem,
diff --git 
a/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMapreduceOrcOutputFormat.java
 
b/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMapreduceOrcOutputFormat.java
index 702a73755..897bce668 100644
--- 
a/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMapreduceOrcOutputFormat.java
+++ 
b/java/mapreduce/src/test/org/apache/orc/mapreduce/TestMapreduceOrcOutputFormat.java
@@ -283,6 +283,7 @@ public class TestMapreduceOrcOutputFormat {
     writer.close(attemptContext);
     Path path = new Path(workDir, "part-m-00000.orc");
     Reader file = OrcFile.createReader(path, OrcFile.readerOptions(conf));
+    assertEquals(OrcConf.ROW_BATCH_SIZE.getDefaultValue(), 
file.options().getRowBatchSize());
     assertEquals(3000, file.getNumberOfRows());
     assertEquals(TYPE_STRING, file.getSchema().toString());
   }
@@ -313,13 +314,14 @@ public class TestMapreduceOrcOutputFormat {
     writer.close(attemptContext);
     Path path = new Path(workDir, "part-m-00000.orc");
     Reader file = OrcFile.createReader(path, OrcFile.readerOptions(conf));
+    assertEquals(128, file.options().getRowBatchSize());
     assertEquals(2000, file.getNumberOfRows());
     assertEquals(TYPE_STRING, file.getSchema().toString());
   }
 
   private static class OrcOutputFormatWithRowBatchSize<V extends Writable> 
extends OrcOutputFormat {
     private static final String EXTENSION = ".orc";
-    public static final String ROW_BATCH_SIZE = "orc.row.batch.size";
+    public static final String ROW_BATCH_SIZE = 
OrcConf.ROW_BATCH_SIZE.getAttribute();
 
     @Override
     public RecordWriter<NullWritable, V>

Reply via email to