[
https://issues.apache.org/jira/browse/DRILL-5356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15959867#comment-15959867
]
ASF GitHub Bot commented on DRILL-5356:
---------------------------------------
Github user ppadma commented on a diff in the pull request:
https://github.com/apache/drill/pull/789#discussion_r110277141
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
---
@@ -308,163 +232,50 @@ public FragmentContext getFragmentContext() {
}
/**
- * Returns data type length for a given {@see ColumnDescriptor} and it's
corresponding
- * {@see SchemaElement}. Neither is enough information alone as the max
- * repetition level (indicating if it is an array type) is in the
ColumnDescriptor and
- * the length of a fixed width field is stored at the schema level.
- *
- * @return the length if fixed width, else -1
+ * Prepare the Parquet reader. First determine the set of columns to
read (the schema
+ * for this read.) Then, create a state object to track the read across
calls to
+ * the reader <tt>next()</tt> method. Finally, create one of three
readers to
+ * read batches depending on whether this scan is for only fixed-width
fields,
+ * contains at least one variable-width field, or is a "mock" scan
consisting
+ * only of null fields (fields in the SELECT clause but not in the
Parquet file.)
*/
- private int getDataTypeLength(ColumnDescriptor column, SchemaElement se)
{
- if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
- if (column.getMaxRepetitionLevel() > 0) {
- return -1;
- }
- if (column.getType() ==
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
- return se.getType_length() * 8;
- } else {
- return getTypeLengthInBits(column.getType());
- }
- } else {
- return -1;
- }
- }
- @SuppressWarnings({ "resource", "unchecked" })
@Override
public void setup(OperatorContext operatorContext, OutputMutator output)
throws ExecutionSetupException {
this.operatorContext = operatorContext;
- if (!isStarQuery()) {
- columnsFound = new boolean[getColumns().size()];
- nullFilledVectors = new ArrayList<>();
- }
- columnStatuses = new ArrayList<>();
- List<ColumnDescriptor> columns =
footer.getFileMetaData().getSchema().getColumns();
- allFieldsFixedLength = true;
- ColumnDescriptor column;
- ColumnChunkMetaData columnChunkMetaData;
- int columnsToScan = 0;
- mockRecordsRead = 0;
-
- MaterializedField field;
+ schema = new ParquetSchema(fragmentContext.getOptions(),
rowGroupIndex, isStarQuery() ? null : getColumns());
logger.debug("Reading row group({}) with {} records in file {}.",
rowGroupIndex, footer.getBlocks().get(rowGroupIndex).getRowCount(),
hadoopPath.toUri().getPath());
- totalRecordsRead = 0;
-
- // TODO - figure out how to deal with this better once we add nested
reading, note also look where this map is used below
- // store a map from column name to converted types if they are non-null
- Map<String, SchemaElement> schemaElements =
ParquetReaderUtility.getColNameToSchemaElementMapping(footer);
-
- // loop to add up the length of the fixed width columns and build the
schema
- for (int i = 0; i < columns.size(); ++i) {
- column = columns.get(i);
- SchemaElement se = schemaElements.get(column.getPath()[0]);
- MajorType mt =
ParquetToDrillTypeConverter.toMajorType(column.getType(), se.getType_length(),
- getDataMode(column), se, fragmentContext.getOptions());
- field = MaterializedField.create(toFieldName(column.getPath()), mt);
- if ( ! fieldSelected(field)) {
- continue;
- }
- columnsToScan++;
- int dataTypeLength = getDataTypeLength(column, se);
- if (dataTypeLength == -1) {
- allFieldsFixedLength = false;
- } else {
- bitWidthAllFixedFields += dataTypeLength;
- }
- }
-
- if (columnsToScan != 0 && allFieldsFixedLength) {
- recordsPerBatch = (int) Math.min(Math.min(batchSize /
bitWidthAllFixedFields,
- footer.getBlocks().get(0).getColumns().get(0).getValueCount()),
DEFAULT_RECORDS_TO_READ_IF_FIXED_WIDTH);
- }
- else {
- recordsPerBatch = DEFAULT_RECORDS_TO_READ_IF_VARIABLE_WIDTH;
- }
try {
- ValueVector vector;
- SchemaElement schemaElement;
- final ArrayList<VarLengthColumn<? extends ValueVector>>
varLengthColumns = new ArrayList<>();
- // initialize all of the column read status objects
- boolean fieldFixedLength;
- // the column chunk meta-data is not guaranteed to be in the same
order as the columns in the schema
- // a map is constructed for fast access to the correct
columnChunkMetadata to correspond
- // to an element in the schema
- Map<String, Integer> columnChunkMetadataPositionsInList = new
HashMap<>();
- BlockMetaData rowGroupMetadata =
footer.getBlocks().get(rowGroupIndex);
-
- int colChunkIndex = 0;
- for (ColumnChunkMetaData colChunk : rowGroupMetadata.getColumns()) {
-
columnChunkMetadataPositionsInList.put(Arrays.toString(colChunk.getPath().toArray()),
colChunkIndex);
- colChunkIndex++;
- }
- for (int i = 0; i < columns.size(); ++i) {
- column = columns.get(i);
- columnChunkMetaData =
rowGroupMetadata.getColumns().get(columnChunkMetadataPositionsInList.get(Arrays.toString(column.getPath())));
- schemaElement = schemaElements.get(column.getPath()[0]);
- MajorType type =
ParquetToDrillTypeConverter.toMajorType(column.getType(),
schemaElement.getType_length(),
- getDataMode(column), schemaElement,
fragmentContext.getOptions());
- field = MaterializedField.create(toFieldName(column.getPath()),
type);
- // the field was not requested to be read
- if ( ! fieldSelected(field)) {
- continue;
- }
-
- fieldFixedLength = column.getType() !=
PrimitiveType.PrimitiveTypeName.BINARY;
- vector = output.addField(field, (Class<? extends ValueVector>)
TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
- if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
- if (column.getMaxRepetitionLevel() > 0) {
- final RepeatedValueVector repeatedVector =
RepeatedValueVector.class.cast(vector);
- ColumnReader<?> dataReader =
ColumnReaderFactory.createFixedColumnReader(this, fieldFixedLength,
- column, columnChunkMetaData, recordsPerBatch,
- repeatedVector.getDataVector(), schemaElement);
- varLengthColumns.add(new FixedWidthRepeatedReader(this,
dataReader,
- getTypeLengthInBits(column.getType()), -1, column,
columnChunkMetaData, false, repeatedVector, schemaElement));
- }
- else {
-
- ColumnReader<?> cr =
ColumnReaderFactory.createFixedColumnReader(this, fieldFixedLength,
- column, columnChunkMetaData, recordsPerBatch, vector,
- schemaElement) ;
- columnStatuses.add(cr);
- }
- } else {
- // create a reader and add it to the appropriate list
- varLengthColumns.add(ColumnReaderFactory.getReader(this, -1,
column, columnChunkMetaData, false, vector, schemaElement));
- }
- }
- varLengthReader = new VarLenBinaryReader(this, varLengthColumns);
-
- if (!isStarQuery()) {
- List<SchemaPath> projectedColumns =
Lists.newArrayList(getColumns());
- SchemaPath col;
- for (int i = 0; i < columnsFound.length; i++) {
- col = projectedColumns.get(i);
- assert col!=null;
- if ( ! columnsFound[i] && !col.equals(STAR_COLUMN)) {
-
nullFilledVectors.add((NullableIntVector)output.addField(MaterializedField.create(col.getAsUnescapedPath(),
- Types.optional(TypeProtos.MinorType.INT)),
- (Class<? extends ValueVector>)
TypeHelper.getValueVectorClass(TypeProtos.MinorType.INT, DataMode.OPTIONAL)));
-
- }
- }
- }
+ schema.buildSchema(footer, batchSize);
--- End diff --
May be pass footer in the constructor of ParquetSchema itself ?
> Refactor Parquet Record Reader
> ------------------------------
>
> Key: DRILL-5356
> URL: https://issues.apache.org/jira/browse/DRILL-5356
> Project: Apache Drill
> Issue Type: Improvement
> Affects Versions: 1.10.0, 1.11.0
> Reporter: Paul Rogers
> Assignee: Paul Rogers
> Priority: Minor
> Fix For: 1.11.0
>
>
> The Parquet record reader class is a key part of Drill that has evolved over
> time to become somewhat hard to follow.
> A number of us are working on Parquet-related tasks and find we have to spend
> an uncomfortable amount of time trying to understand the code. In particular,
> this writer needs to figure out how to convince the reader to provide
> higher-density record batches.
> Rather than continue to decypher the complex code multiple times, this ticket
> requests to refactor the code to make it functionally identical, but
> structurally cleaner. The result will be faster time to value when working
> with this code.
> This is a lower-priority change and will be coordinated with others working
> on this code base. This ticket is only for the record reader class itself; it
> does not include the various readers and writers that Parquet uses since
> another project is actively modifying those classes.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)