This is an automated email from the ASF dual-hosted git repository.

timothyfarkas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d62585  DRILL-6853: Make the complex parquet reader batch max row 
size configurable
9d62585 is described below

commit 9d62585dc7e42376e075c2a30cf12cc7f8db5b0e
Author: Salim Achouche <[email protected]>
AuthorDate: Thu Nov 15 14:40:48 2018 -0800

    DRILL-6853: Make the complex parquet reader batch max row size configurable
---
 .../src/main/java/org/apache/drill/exec/ExecConstants.java       | 9 +++++++--
 .../apache/drill/exec/server/options/SystemOptionManager.java    | 1 +
 .../org/apache/drill/exec/store/parquet2/DrillParquetReader.java | 6 +++++-
 exec/java-exec/src/main/resources/drill-module.conf              | 1 +
 4 files changed, 14 insertions(+), 3 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index c65ce2b..95168bc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -388,12 +388,17 @@ public final class ExecConstants {
 
   // Controls the flat parquet reader batching constraints (number of record 
and memory limit)
   public static final String PARQUET_FLAT_BATCH_NUM_RECORDS = 
"store.parquet.flat.batch.num_records";
-  public static final OptionValidator PARQUET_FLAT_BATCH_NUM_RECORDS_VALIDATOR 
= new RangeLongValidator(PARQUET_FLAT_BATCH_NUM_RECORDS, 1, 
ValueVector.MAX_ROW_COUNT,
+  public static final OptionValidator PARQUET_FLAT_BATCH_NUM_RECORDS_VALIDATOR 
= new RangeLongValidator(PARQUET_FLAT_BATCH_NUM_RECORDS, 1, 
ValueVector.MAX_ROW_COUNT -1,
       new OptionDescription("Parquet Reader maximum number of records per 
batch."));
   public static final String PARQUET_FLAT_BATCH_MEMORY_SIZE = 
"store.parquet.flat.batch.memory_size";
   // This configuration is used to overwrite the common memory batch sizing 
configuration property
   public static final OptionValidator PARQUET_FLAT_BATCH_MEMORY_SIZE_VALIDATOR 
= new RangeLongValidator(PARQUET_FLAT_BATCH_MEMORY_SIZE, 0, Integer.MAX_VALUE,
-      new OptionDescription("Parquet Reader maximum memory size per batch."));
+      new OptionDescription("Flat Parquet Reader maximum memory size per 
batch."));
+
+  // Controls the complex parquet reader batch sizing configuration
+  public static final String PARQUET_COMPLEX_BATCH_NUM_RECORDS = 
"store.parquet.complex.batch.num_records";
+  public static final OptionValidator 
PARQUET_COMPLEX_BATCH_NUM_RECORDS_VALIDATOR = new 
RangeLongValidator(PARQUET_COMPLEX_BATCH_NUM_RECORDS, 1, 
ValueVector.MAX_ROW_COUNT -1,
+      new OptionDescription("Complex Parquet Reader maximum number of records 
per batch."));
 
   public static final String JSON_ALL_TEXT_MODE = "store.json.all_text_mode";
   public static final BooleanValidator JSON_READER_ALL_TEXT_MODE_VALIDATOR = 
new BooleanValidator(JSON_ALL_TEXT_MODE,
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 2acde8a..87e5286 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -175,6 +175,7 @@ public class SystemOptionManager extends BaseOptionManager 
implements AutoClosea
       new OptionDefinition(ExecConstants.PARQUET_FLAT_READER_BULK_VALIDATOR),
       new 
OptionDefinition(ExecConstants.PARQUET_FLAT_BATCH_NUM_RECORDS_VALIDATOR, new 
OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
       new 
OptionDefinition(ExecConstants.PARQUET_FLAT_BATCH_MEMORY_SIZE_VALIDATOR, new 
OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
+      new 
OptionDefinition(ExecConstants.PARQUET_COMPLEX_BATCH_NUM_RECORDS_VALIDATOR, new 
OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
       new OptionDefinition(ExecConstants.JSON_READER_ALL_TEXT_MODE_VALIDATOR),
       new 
OptionDefinition(ExecConstants.JSON_WRITER_NAN_INF_NUMBERS_VALIDATOR),
       new 
OptionDefinition(ExecConstants.JSON_READER_NAN_INF_NUMBERS_VALIDATOR),
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index 7108ca6..09c016a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -32,6 +32,7 @@ import 
org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.OutOfMemoryException;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.ops.FragmentContext;
@@ -82,6 +83,8 @@ public class DrillParquetReader extends AbstractRecordReader {
   private int recordCount;
   private OperatorContext operatorContext;
   private FragmentContext fragmentContext;
+  /** Configured Parquet records per batch */
+  private final int recordsPerBatch;
 
   // For columns not found in the file, we need to return a schema element 
with the correct number of values
   // at that position in the schema. Currently this requires a vector be 
present. Here is a list of all of these vectors
@@ -105,6 +108,7 @@ public class DrillParquetReader extends 
AbstractRecordReader {
     this.entry = entry;
     setColumns(columns);
     this.fragmentContext = fragmentContext;
+    this.recordsPerBatch = (int) 
fragmentContext.getOptions().getLong(ExecConstants.PARQUET_COMPLEX_BATCH_NUM_RECORDS);
   }
 
   public static MessageType getProjection(MessageType schema,
@@ -299,7 +303,7 @@ public class DrillParquetReader extends 
AbstractRecordReader {
       return (int) recordsToRead;
     }
 
-    while (count < 4000 && totalRead < recordCount) {
+    while (count < recordsPerBatch && totalRead < recordCount) {
       recordMaterializer.setPosition(count);
       recordReader.read();
       count++;
diff --git a/exec/java-exec/src/main/resources/drill-module.conf 
b/exec/java-exec/src/main/resources/drill-module.conf
index dfbbbcb..632b4ed 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -613,6 +613,7 @@ drill.exec.options: {
     store.parquet.writer.use_single_fs_block: false,
     store.parquet.flat.reader.bulk: true,
     store.parquet.flat.batch.num_records: 32767,
+    store.parquet.complex.batch.num_records: 4000,
     # Using common operators batch configuration unless the Parquet specific
     # configuration is used
     store.parquet.flat.batch.memory_size: 0,

Reply via email to