[ 
https://issues.apache.org/jira/browse/DRILL-6853?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16688845#comment-16688845
 ] 

ASF GitHub Bot commented on DRILL-6853:
---------------------------------------

ilooner closed pull request #1544: DRILL-6853: Make the complex parquet reader 
batch max row size config…
URL: https://github.com/apache/drill/pull/1544
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index eed4ff830b6..5bda8885932 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -378,12 +378,17 @@ private ExecConstants() {
 
   // Controls the flat parquet reader batching constraints (number of record 
and memory limit)
   public static final String PARQUET_FLAT_BATCH_NUM_RECORDS = 
"store.parquet.flat.batch.num_records";
-  public static final OptionValidator PARQUET_FLAT_BATCH_NUM_RECORDS_VALIDATOR 
= new RangeLongValidator(PARQUET_FLAT_BATCH_NUM_RECORDS, 1, 
ValueVector.MAX_ROW_COUNT,
+  public static final OptionValidator PARQUET_FLAT_BATCH_NUM_RECORDS_VALIDATOR 
= new RangeLongValidator(PARQUET_FLAT_BATCH_NUM_RECORDS, 1, 
ValueVector.MAX_ROW_COUNT -1,
       new OptionDescription("Parquet Reader maximum number of records per 
batch."));
   public static final String PARQUET_FLAT_BATCH_MEMORY_SIZE = 
"store.parquet.flat.batch.memory_size";
   // This configuration is used to overwrite the common memory batch sizing 
configuration property
   public static final OptionValidator PARQUET_FLAT_BATCH_MEMORY_SIZE_VALIDATOR 
= new RangeLongValidator(PARQUET_FLAT_BATCH_MEMORY_SIZE, 0, Integer.MAX_VALUE,
-      new OptionDescription("Parquet Reader maximum memory size per batch."));
+      new OptionDescription("Flat Parquet Reader maximum memory size per 
batch."));
+
+  // Controls the complex parquet reader batch sizing configuration
+  public static final String PARQUET_COMPLEX_BATCH_NUM_RECORDS = 
"store.parquet.complex.batch.num_records";
+  public static final OptionValidator 
PARQUET_COMPLEX_BATCH_NUM_RECORDS_VALIDATOR = new 
RangeLongValidator(PARQUET_COMPLEX_BATCH_NUM_RECORDS, 1, 
ValueVector.MAX_ROW_COUNT -1,
+      new OptionDescription("Complex Parquet Reader maximum number of records 
per batch."));
 
   public static final String JSON_ALL_TEXT_MODE = "store.json.all_text_mode";
   public static final BooleanValidator JSON_READER_ALL_TEXT_MODE_VALIDATOR = 
new BooleanValidator(JSON_ALL_TEXT_MODE,
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 1d0bca06127..3575d6a0843 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -171,6 +171,7 @@
       new OptionDefinition(ExecConstants.PARQUET_FLAT_READER_BULK_VALIDATOR),
       new 
OptionDefinition(ExecConstants.PARQUET_FLAT_BATCH_NUM_RECORDS_VALIDATOR, new 
OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
       new 
OptionDefinition(ExecConstants.PARQUET_FLAT_BATCH_MEMORY_SIZE_VALIDATOR, new 
OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
+      new 
OptionDefinition(ExecConstants.PARQUET_COMPLEX_BATCH_NUM_RECORDS_VALIDATOR, new 
OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
       new OptionDefinition(ExecConstants.JSON_READER_ALL_TEXT_MODE_VALIDATOR),
       new 
OptionDefinition(ExecConstants.JSON_WRITER_NAN_INF_NUMBERS_VALIDATOR),
       new 
OptionDefinition(ExecConstants.JSON_READER_NAN_INF_NUMBERS_VALIDATOR),
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index 7108ca6727b..09c016a5f80 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -32,6 +32,7 @@
 import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -82,6 +83,8 @@
   private int recordCount;
   private OperatorContext operatorContext;
   private FragmentContext fragmentContext;
+  /** Configured Parquet records per batch */
+  private final int recordsPerBatch;
 
   // For columns not found in the file, we need to return a schema element 
with the correct number of values
   // at that position in the schema. Currently this requires a vector be 
present. Here is a list of all of these vectors
@@ -105,6 +108,7 @@ public DrillParquetReader(FragmentContext fragmentContext, 
ParquetMetadata foote
     this.entry = entry;
     setColumns(columns);
     this.fragmentContext = fragmentContext;
+    this.recordsPerBatch = (int) 
fragmentContext.getOptions().getLong(ExecConstants.PARQUET_COMPLEX_BATCH_NUM_RECORDS);
   }
 
   public static MessageType getProjection(MessageType schema,
@@ -299,7 +303,7 @@ public int next() {
       return (int) recordsToRead;
     }
 
-    while (count < 4000 && totalRead < recordCount) {
+    while (count < recordsPerBatch && totalRead < recordCount) {
       recordMaterializer.setPosition(count);
       recordReader.read();
       count++;
diff --git a/exec/java-exec/src/main/resources/drill-module.conf 
b/exec/java-exec/src/main/resources/drill-module.conf
index 5981f2d5501..24577c13a17 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -605,6 +605,7 @@ drill.exec.options: {
     store.parquet.writer.use_single_fs_block: false,
     store.parquet.flat.reader.bulk: true,
     store.parquet.flat.batch.num_records: 32767,
+    store.parquet.complex.batch.num_records: 4000,
     # Using common operators batch configuration unless the Parquet specific
     # configuration is used
     store.parquet.flat.batch.memory_size: 0,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Parquet Complex Reader for nested schema should have configurable memory or 
> max records to fetch
> ------------------------------------------------------------------------------------------------
>
>                 Key: DRILL-6853
>                 URL: https://issues.apache.org/jira/browse/DRILL-6853
>             Project: Apache Drill
>          Issue Type: Bug
>    Affects Versions: 1.14.0
>            Reporter: Nitin Sharma
>            Assignee: salim achouche
>            Priority: Major
>              Labels: doc-impacting, pull-request-available, ready-to-commit
>             Fix For: 1.15.0
>
>
> Parquet Complex reader while fetching nested schema should have configurable 
> memory or max records to fetch and not default to 4000 records.
> While scanning TB of data with wider columns, this could easily cause OOM 
> issues. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to