This is an automated email from the ASF dual-hosted git repository.
timothyfarkas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 9d62585 DRILL-6853: Make the complex parquet reader batch max row
size configurable
9d62585 is described below
commit 9d62585dc7e42376e075c2a30cf12cc7f8db5b0e
Author: Salim Achouche <[email protected]>
AuthorDate: Thu Nov 15 14:40:48 2018 -0800
DRILL-6853: Make the complex parquet reader batch max row size configurable
---
.../src/main/java/org/apache/drill/exec/ExecConstants.java | 9 +++++++--
.../apache/drill/exec/server/options/SystemOptionManager.java | 1 +
.../org/apache/drill/exec/store/parquet2/DrillParquetReader.java | 6 +++++-
exec/java-exec/src/main/resources/drill-module.conf | 1 +
4 files changed, 14 insertions(+), 3 deletions(-)
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index c65ce2b..95168bc 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -388,12 +388,17 @@ public final class ExecConstants {
// Controls the flat parquet reader batching constraints (number of record
and memory limit)
public static final String PARQUET_FLAT_BATCH_NUM_RECORDS =
"store.parquet.flat.batch.num_records";
- public static final OptionValidator PARQUET_FLAT_BATCH_NUM_RECORDS_VALIDATOR
= new RangeLongValidator(PARQUET_FLAT_BATCH_NUM_RECORDS, 1,
ValueVector.MAX_ROW_COUNT,
+ public static final OptionValidator PARQUET_FLAT_BATCH_NUM_RECORDS_VALIDATOR
= new RangeLongValidator(PARQUET_FLAT_BATCH_NUM_RECORDS, 1,
ValueVector.MAX_ROW_COUNT -1,
new OptionDescription("Parquet Reader maximum number of records per
batch."));
public static final String PARQUET_FLAT_BATCH_MEMORY_SIZE =
"store.parquet.flat.batch.memory_size";
// This configuration is used to overwrite the common memory batch sizing
configuration property
public static final OptionValidator PARQUET_FLAT_BATCH_MEMORY_SIZE_VALIDATOR
= new RangeLongValidator(PARQUET_FLAT_BATCH_MEMORY_SIZE, 0, Integer.MAX_VALUE,
- new OptionDescription("Parquet Reader maximum memory size per batch."));
+ new OptionDescription("Flat Parquet Reader maximum memory size per
batch."));
+
+ // Controls the complex parquet reader batch sizing configuration
+ public static final String PARQUET_COMPLEX_BATCH_NUM_RECORDS =
"store.parquet.complex.batch.num_records";
+ public static final OptionValidator
PARQUET_COMPLEX_BATCH_NUM_RECORDS_VALIDATOR = new
RangeLongValidator(PARQUET_COMPLEX_BATCH_NUM_RECORDS, 1,
ValueVector.MAX_ROW_COUNT -1,
+ new OptionDescription("Complex Parquet Reader maximum number of records
per batch."));
public static final String JSON_ALL_TEXT_MODE = "store.json.all_text_mode";
public static final BooleanValidator JSON_READER_ALL_TEXT_MODE_VALIDATOR =
new BooleanValidator(JSON_ALL_TEXT_MODE,
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 2acde8a..87e5286 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -175,6 +175,7 @@ public class SystemOptionManager extends BaseOptionManager
implements AutoClosea
new OptionDefinition(ExecConstants.PARQUET_FLAT_READER_BULK_VALIDATOR),
new
OptionDefinition(ExecConstants.PARQUET_FLAT_BATCH_NUM_RECORDS_VALIDATOR, new
OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
new
OptionDefinition(ExecConstants.PARQUET_FLAT_BATCH_MEMORY_SIZE_VALIDATOR, new
OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
+ new
OptionDefinition(ExecConstants.PARQUET_COMPLEX_BATCH_NUM_RECORDS_VALIDATOR, new
OptionMetaData(OptionValue.AccessibleScopes.SYSTEM_AND_SESSION, true, true)),
new OptionDefinition(ExecConstants.JSON_READER_ALL_TEXT_MODE_VALIDATOR),
new
OptionDefinition(ExecConstants.JSON_WRITER_NAN_INF_NUMBERS_VALIDATOR),
new
OptionDefinition(ExecConstants.JSON_READER_NAN_INF_NUMBERS_VALIDATOR),
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index 7108ca6..09c016a 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -32,6 +32,7 @@ import
org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.ops.FragmentContext;
@@ -82,6 +83,8 @@ public class DrillParquetReader extends AbstractRecordReader {
private int recordCount;
private OperatorContext operatorContext;
private FragmentContext fragmentContext;
+ /** Configured Parquet records per batch */
+ private final int recordsPerBatch;
// For columns not found in the file, we need to return a schema element
with the correct number of values
// at that position in the schema. Currently this requires a vector be
present. Here is a list of all of these vectors
@@ -105,6 +108,7 @@ public class DrillParquetReader extends
AbstractRecordReader {
this.entry = entry;
setColumns(columns);
this.fragmentContext = fragmentContext;
+ this.recordsPerBatch = (int)
fragmentContext.getOptions().getLong(ExecConstants.PARQUET_COMPLEX_BATCH_NUM_RECORDS);
}
public static MessageType getProjection(MessageType schema,
@@ -299,7 +303,7 @@ public class DrillParquetReader extends
AbstractRecordReader {
return (int) recordsToRead;
}
- while (count < 4000 && totalRead < recordCount) {
+ while (count < recordsPerBatch && totalRead < recordCount) {
recordMaterializer.setPosition(count);
recordReader.read();
count++;
diff --git a/exec/java-exec/src/main/resources/drill-module.conf
b/exec/java-exec/src/main/resources/drill-module.conf
index dfbbbcb..632b4ed 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -613,6 +613,7 @@ drill.exec.options: {
store.parquet.writer.use_single_fs_block: false,
store.parquet.flat.reader.bulk: true,
store.parquet.flat.batch.num_records: 32767,
+ store.parquet.complex.batch.num_records: 4000,
# Using common operators batch configuration unless the Parquet specific
# configuration is used
store.parquet.flat.batch.memory_size: 0,