Drill-1688: Complex parquet reader fails to read wide records.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/1e21045b Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/1e21045b Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/1e21045b Branch: refs/heads/master Commit: 1e21045bfdab8fde8fc7d3d3a4182ddd4d2e41f3 Parents: 52b729e Author: Jason Altekruse <altekruseja...@gmail.com> Authored: Tue Nov 11 16:53:55 2014 -0800 Committer: Jacques Nadeau <jacq...@apache.org> Committed: Tue Nov 11 19:27:41 2014 -0800 ---------------------------------------------------------------------- .../org/apache/drill/exec/ExecConstants.java | 5 +++++ .../exec/server/options/SystemOptionManager.java | 2 ++ .../store/parquet/ParquetScanBatchCreator.java | 2 +- .../exec/store/parquet2/DrillParquetReader.java | 19 ++++++++++++++++--- 4 files changed, 24 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1e21045b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java index f01f577..f204506 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java @@ -85,6 +85,11 @@ public interface ExecConstants { public static final OptionValidator OUTPUT_FORMAT_VALIDATOR = new StringValidator(OUTPUT_FORMAT_OPTION, "parquet"); public static final String PARQUET_BLOCK_SIZE = "store.parquet.block-size"; public static final OptionValidator PARQUET_BLOCK_SIZE_VALIDATOR = new LongValidator(PARQUET_BLOCK_SIZE, 512*1024*1024); + + public static final String PARQUET_VECTOR_FILL_THRESHOLD = "store.parquet.vector_fill_threshold"; + public static final OptionValidator PARQUET_VECTOR_FILL_THRESHOLD_VALIDATOR = new PositiveLongValidator(PARQUET_VECTOR_FILL_THRESHOLD, 99l, 85l); + public static final String PARQUET_VECTOR_FILL_CHECK_THRESHOLD = "store.parquet.vector_fill_check_threshold"; + public static final OptionValidator PARQUET_VECTOR_FILL_CHECK_THRESHOLD_VALIDATOR = new PositiveLongValidator(PARQUET_VECTOR_FILL_CHECK_THRESHOLD, 100l, 10l); public static String PARQUET_NEW_RECORD_READER = "store.parquet.use_new_reader"; public static OptionValidator PARQUET_RECORD_READER_IMPLEMENTATION_VALIDATOR = new BooleanValidator(PARQUET_NEW_RECORD_READER, false); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1e21045b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index e802b44..9f912e0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -54,6 +54,8 @@ public class SystemOptionManager implements OptionManager { PlannerSettings.HASH_SINGLE_KEY, ExecConstants.OUTPUT_FORMAT_VALIDATOR, ExecConstants.PARQUET_BLOCK_SIZE_VALIDATOR, + ExecConstants.PARQUET_VECTOR_FILL_THRESHOLD_VALIDATOR, + ExecConstants.PARQUET_VECTOR_FILL_CHECK_THRESHOLD_VALIDATOR, ExecConstants.PARQUET_RECORD_READER_IMPLEMENTATION_VALIDATOR, ExecConstants.JSON_READER_ALL_TEXT_MODE_VALIDATOR, ExecConstants.FILESYSTEM_PARTITION_COLUMN_LABEL_VALIDATOR, http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1e21045b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java index 8aebab9..53a6ffc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java @@ -118,7 +118,7 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan ); } else { ParquetMetadata footer = footers.get(e.getPath()); - readers.add(new DrillParquetReader(footer, e, columns, conf)); + readers.add(new DrillParquetReader(context, footer, e, columns, conf)); } if (rowGroupScan.getSelectionRoot() != null) { String[] r = rowGroupScan.getSelectionRoot().split("/"); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/1e21045b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java index 8b5d035..8765935 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java @@ -27,7 +27,9 @@ import java.util.Map; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.PathSegment; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.memory.OutOfMemoryException; +import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.record.MaterializedField.Key; @@ -73,13 +75,23 @@ public class DrillParquetReader extends AbstractRecordReader { private int recordCount; private List<ValueVector> primitiveVectors; private OperatorContext operatorContext; + // The interface for the parquet-mr library does not allow re-winding, to enable us to write into our + // fixed size value vectors, we must check how full the vectors are after some number of reads, for performance + // we avoid doing this every record. These values are populated with system/session settings to allow users to optimize + // for performance or allow a wider record size to be suported + private final int fillLevelCheckFrequency; + private final int fillLevelCheckThreshold; + private FragmentContext fragmentContext; - public DrillParquetReader(ParquetMetadata footer, RowGroupReadEntry entry, List<SchemaPath> columns, Configuration conf) { + public DrillParquetReader(FragmentContext fragmentContext, ParquetMetadata footer, RowGroupReadEntry entry, List<SchemaPath> columns, Configuration conf) { this.footer = footer; this.conf = conf; this.entry = entry; setColumns(columns); + this.fragmentContext = fragmentContext; + fillLevelCheckFrequency = this.fragmentContext.getOptions().getOption(ExecConstants.PARQUET_VECTOR_FILL_CHECK_THRESHOLD).num_val.intValue(); + fillLevelCheckThreshold = this.fragmentContext.getOptions().getOption(ExecConstants.PARQUET_VECTOR_FILL_THRESHOLD).num_val.intValue(); } public static MessageType getProjection(MessageType schema, Collection<SchemaPath> columns) { @@ -200,8 +212,8 @@ public class DrillParquetReader extends AbstractRecordReader { recordReader.read(); count++; totalRead++; - if (count % 100 == 0) { - if (getPercentFilled() > 85) { + if (count % fillLevelCheckFrequency == 0) { + if (getPercentFilled() > fillLevelCheckThreshold) { break; } } @@ -217,6 +229,7 @@ public class DrillParquetReader extends AbstractRecordReader { if (v instanceof VariableWidthVector) { filled = Math.max(filled, ((VariableWidthVector) v).getCurrentSizeInBytes() * 100 / ((VariableWidthVector) v).getByteCapacity()); } + // TODO - need to re-enable this // if (v instanceof RepeatedFixedWidthVector) { // filled = Math.max(filled, ((RepeatedFixedWidthVector) v).getAccessor().getGroupCount() * 100) // }