Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/789#discussion_r118591078
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
 ---
    @@ -308,163 +232,50 @@ public FragmentContext getFragmentContext() {
       }
     
       /**
    -   * Returns data type length for a given {@see ColumnDescriptor} and it's 
corresponding
    -   * {@see SchemaElement}. Neither is enough information alone as the max
    -   * repetition level (indicating if it is an array type) is in the 
ColumnDescriptor and
    -   * the length of a fixed width field is stored at the schema level.
    -   *
    -   * @return the length if fixed width, else -1
    +   * Prepare the Parquet reader. First determine the set of columns to 
read (the schema
    +   * for this read.) Then, create a state object to track the read across 
calls to
    +   * the reader <tt>next()</tt> method. Finally, create one of three 
readers to
    +   * read batches depending on whether this scan is for only fixed-width 
fields,
    +   * contains at least one variable-width field, or is a "mock" scan 
consisting
    +   * only of null fields (fields in the SELECT clause but not in the 
Parquet file.)
        */
    -  private int getDataTypeLength(ColumnDescriptor column, SchemaElement se) 
{
    -    if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
    -      if (column.getMaxRepetitionLevel() > 0) {
    -        return -1;
    -      }
    -      if (column.getType() == 
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
    -        return se.getType_length() * 8;
    -      } else {
    -        return getTypeLengthInBits(column.getType());
    -      }
    -    } else {
    -      return -1;
    -    }
    -  }
     
    -  @SuppressWarnings({ "resource", "unchecked" })
       @Override
       public void setup(OperatorContext operatorContext, OutputMutator output) 
throws ExecutionSetupException {
         this.operatorContext = operatorContext;
    -    if (!isStarQuery()) {
    -      columnsFound = new boolean[getColumns().size()];
    -      nullFilledVectors = new ArrayList<>();
    -    }
    -    columnStatuses = new ArrayList<>();
    -    List<ColumnDescriptor> columns = 
footer.getFileMetaData().getSchema().getColumns();
    -    allFieldsFixedLength = true;
    -    ColumnDescriptor column;
    -    ColumnChunkMetaData columnChunkMetaData;
    -    int columnsToScan = 0;
    -    mockRecordsRead = 0;
    -
    -    MaterializedField field;
    +    schema = new ParquetSchema(fragmentContext.getOptions(), 
rowGroupIndex, isStarQuery() ? null : getColumns());
     
         logger.debug("Reading row group({}) with {} records in file {}.", 
rowGroupIndex, footer.getBlocks().get(rowGroupIndex).getRowCount(),
             hadoopPath.toUri().getPath());
    -    totalRecordsRead = 0;
    -
    -    // TODO - figure out how to deal with this better once we add nested 
reading, note also look where this map is used below
    -    // store a map from column name to converted types if they are non-null
    -    Map<String, SchemaElement> schemaElements = 
ParquetReaderUtility.getColNameToSchemaElementMapping(footer);
    -
    -    // loop to add up the length of the fixed width columns and build the 
schema
    -    for (int i = 0; i < columns.size(); ++i) {
    -      column = columns.get(i);
    -      SchemaElement se = schemaElements.get(column.getPath()[0]);
    -      MajorType mt = 
ParquetToDrillTypeConverter.toMajorType(column.getType(), se.getType_length(),
    -          getDataMode(column), se, fragmentContext.getOptions());
    -      field = MaterializedField.create(toFieldName(column.getPath()), mt);
    -      if ( ! fieldSelected(field)) {
    -        continue;
    -      }
    -      columnsToScan++;
    -      int dataTypeLength = getDataTypeLength(column, se);
    -      if (dataTypeLength == -1) {
    -        allFieldsFixedLength = false;
    -      } else {
    -        bitWidthAllFixedFields += dataTypeLength;
    -      }
    -    }
    -
    -    if (columnsToScan != 0  && allFieldsFixedLength) {
    -      recordsPerBatch = (int) Math.min(Math.min(batchSize / 
bitWidthAllFixedFields,
    -          footer.getBlocks().get(0).getColumns().get(0).getValueCount()), 
DEFAULT_RECORDS_TO_READ_IF_FIXED_WIDTH);
    -    }
    -    else {
    -      recordsPerBatch = DEFAULT_RECORDS_TO_READ_IF_VARIABLE_WIDTH;
    -    }
     
         try {
    -      ValueVector vector;
    -      SchemaElement schemaElement;
    -      final ArrayList<VarLengthColumn<? extends ValueVector>> 
varLengthColumns = new ArrayList<>();
    -      // initialize all of the column read status objects
    -      boolean fieldFixedLength;
    -      // the column chunk meta-data is not guaranteed to be in the same 
order as the columns in the schema
    -      // a map is constructed for fast access to the correct 
columnChunkMetadata to correspond
    -      // to an element in the schema
    -      Map<String, Integer> columnChunkMetadataPositionsInList = new 
HashMap<>();
    -      BlockMetaData rowGroupMetadata = 
footer.getBlocks().get(rowGroupIndex);
    -
    -      int colChunkIndex = 0;
    -      for (ColumnChunkMetaData colChunk : rowGroupMetadata.getColumns()) {
    -        
columnChunkMetadataPositionsInList.put(Arrays.toString(colChunk.getPath().toArray()),
 colChunkIndex);
    -        colChunkIndex++;
    -      }
    -      for (int i = 0; i < columns.size(); ++i) {
    -        column = columns.get(i);
    -        columnChunkMetaData = 
rowGroupMetadata.getColumns().get(columnChunkMetadataPositionsInList.get(Arrays.toString(column.getPath())));
    -        schemaElement = schemaElements.get(column.getPath()[0]);
    -        MajorType type = 
ParquetToDrillTypeConverter.toMajorType(column.getType(), 
schemaElement.getType_length(),
    -            getDataMode(column), schemaElement, 
fragmentContext.getOptions());
    -        field = MaterializedField.create(toFieldName(column.getPath()), 
type);
    -        // the field was not requested to be read
    -        if ( ! fieldSelected(field)) {
    -          continue;
    -        }
    -
    -        fieldFixedLength = column.getType() != 
PrimitiveType.PrimitiveTypeName.BINARY;
    -        vector = output.addField(field, (Class<? extends ValueVector>) 
TypeHelper.getValueVectorClass(type.getMinorType(), type.getMode()));
    -        if (column.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
    -          if (column.getMaxRepetitionLevel() > 0) {
    -            final RepeatedValueVector repeatedVector = 
RepeatedValueVector.class.cast(vector);
    -            ColumnReader<?> dataReader = 
ColumnReaderFactory.createFixedColumnReader(this, fieldFixedLength,
    -                column, columnChunkMetaData, recordsPerBatch,
    -                repeatedVector.getDataVector(), schemaElement);
    -            varLengthColumns.add(new FixedWidthRepeatedReader(this, 
dataReader,
    -                getTypeLengthInBits(column.getType()), -1, column, 
columnChunkMetaData, false, repeatedVector, schemaElement));
    -          }
    -          else {
    -
    -           ColumnReader<?> cr = 
ColumnReaderFactory.createFixedColumnReader(this, fieldFixedLength,
    -                column, columnChunkMetaData, recordsPerBatch, vector,
    -                schemaElement) ;
    -            columnStatuses.add(cr);
    -          }
    -        } else {
    -          // create a reader and add it to the appropriate list
    -          varLengthColumns.add(ColumnReaderFactory.getReader(this, -1, 
column, columnChunkMetaData, false, vector, schemaElement));
    -        }
    -      }
    -      varLengthReader = new VarLenBinaryReader(this, varLengthColumns);
    -
    -      if (!isStarQuery()) {
    -        List<SchemaPath> projectedColumns = 
Lists.newArrayList(getColumns());
    -        SchemaPath col;
    -        for (int i = 0; i < columnsFound.length; i++) {
    -          col = projectedColumns.get(i);
    -          assert col!=null;
    -          if ( ! columnsFound[i] && !col.equals(STAR_COLUMN)) {
    -            
nullFilledVectors.add((NullableIntVector)output.addField(MaterializedField.create(col.getAsUnescapedPath(),
    -                    Types.optional(TypeProtos.MinorType.INT)),
    -                (Class<? extends ValueVector>) 
TypeHelper.getValueVectorClass(TypeProtos.MinorType.INT, DataMode.OPTIONAL)));
    -
    -          }
    -        }
    -      }
    +      schema.buildSchema(footer, batchSize);
    --- End diff --
    
    Nice improvement. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to