[ 
https://issues.apache.org/jira/browse/DRILL-5356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15959869#comment-15959869
 ] 

ASF GitHub Bot commented on DRILL-5356:
---------------------------------------

Github user ppadma commented on a diff in the pull request:

    https://github.com/apache/drill/pull/789#discussion_r110279210
  
    --- Diff: 
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java
 ---
    @@ -0,0 +1,262 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.drill.exec.store.parquet.columnreaders;
    +
    +import java.util.ArrayList;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.List;
    +import java.util.Map;
    +
    +import org.apache.drill.common.expression.SchemaPath;
    +import org.apache.drill.common.types.TypeProtos;
    +import org.apache.drill.common.types.Types;
    +import org.apache.drill.common.types.TypeProtos.DataMode;
    +import org.apache.drill.exec.exception.SchemaChangeException;
    +import org.apache.drill.exec.expr.TypeHelper;
    +import org.apache.drill.exec.physical.impl.OutputMutator;
    +import org.apache.drill.exec.record.MaterializedField;
    +import org.apache.drill.exec.server.options.OptionManager;
    +import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
    +import org.apache.drill.exec.vector.NullableIntVector;
    +import org.apache.parquet.column.ColumnDescriptor;
    +import org.apache.parquet.format.SchemaElement;
    +import org.apache.parquet.hadoop.metadata.BlockMetaData;
    +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
    +import org.apache.parquet.hadoop.metadata.ParquetMetadata;
    +
    +import com.google.common.collect.Lists;
    +
    +/**
    + * Mapping from the schema of the Parquet file to that of the record reader
    + * to the schema that Drill and the Parquet reader uses.
    + */
    +
    +public class ParquetSchema {
    +  /**
    +   * Set of columns specified in the SELECT clause. Will be null for
    +   * a SELECT * query.
    +   */
    +  private final Collection<SchemaPath> selectedCols;
    +  /**
    +   * Parallel list to the columns list above, it is used to determine the 
subset of the project
    +   * pushdown columns that do not appear in this file.
    +   */
    +  private final boolean[] columnsFound;
    +  private final OptionManager options;
    +  private final int rowGroupIndex;
    +  private ParquetMetadata footer;
    +  /**
    +   * List of metadata for selected columns. This list does two things.
    +   * First, it identifies the Parquet columns we wish to select. Second, it
    +   * provides metadata for those columns. Note that null columns (columns
    +   * in the SELECT clause but not in the file) appear elsewhere.
    +   */
    +  private List<ParquetColumnMetadata> selectedColumnMetadata = new 
ArrayList<>();
    +  private int bitWidthAllFixedFields;
    +  private boolean allFieldsFixedLength;
    +  private long groupRecordCount;
    +  private int recordsPerBatch;
    +
    +  /**
    +   * Build the Parquet schema. The schema can be based on a "SELECT *",
    +   * meaning we want all columns defined in the Parquet file. In this case,
    +   * the list of selected columns is null. Or, the query can be based on
    +   * an explicit list of selected columns. In this case, the
    +   * columns need not exist in the Parquet file. If a column does not 
exist,
    +   * the reader returns null for that column. If no selected column exists
    +   * in the file, then we return "mock" records: records with only null
    +   * values, but repeated for the number of rows in the Parquet file.
    +   *
    +   * @param options session options
    +   * @param rowGroupIndex row group to read
    +   * @param selectedCols columns specified in the SELECT clause, or null if
    +   * this is a SELECT * query
    +   */
    +
    +  public ParquetSchema(OptionManager options, int rowGroupIndex, 
Collection<SchemaPath> selectedCols) {
    +    this.options = options;
    +    this.rowGroupIndex = rowGroupIndex;
    +    this.selectedCols = selectedCols;
    +    if (selectedCols == null) {
    +      columnsFound = null;
    +    } else {
    +      columnsFound = new boolean[selectedCols.size()];
    +    }
    +  }
    +
    +  /**
    +   * Build the schema for this read as a combination of the schema 
specified in
    +   * the Parquet footer and the list of columns selected in the query.
    +   *
    +   * @param footer Parquet metadata
    +   * @param batchSize target size of the batch, in rows
    +   * @throws Exception if anything goes wrong
    +   */
    +
    +  public void buildSchema(ParquetMetadata footer, long batchSize) throws 
Exception {
    +    this.footer = footer;
    +    groupRecordCount = footer.getBlocks().get(rowGroupIndex).getRowCount();
    +    loadParquetSchema();
    +    computeFixedPart();
    +
    +    if (! selectedColumnMetadata.isEmpty()  && allFieldsFixedLength) {
    +      recordsPerBatch = (int) Math.min(Math.min(batchSize / 
bitWidthAllFixedFields,
    +          footer.getBlocks().get(0).getColumns().get(0).getValueCount()), 
ParquetRecordReader.DEFAULT_RECORDS_TO_READ_IF_FIXED_WIDTH);
    +    }
    +    else {
    +      recordsPerBatch = 
ParquetRecordReader.DEFAULT_RECORDS_TO_READ_IF_VARIABLE_WIDTH;
    +    }
    +  }
    +
    +  /**
    +   * Scan the Parquet footer, then map each Parquet column to the list of 
columns
    +   * we want to read. Track those to be read.
    +   */
    +
    +  private void loadParquetSchema() {
    +    // TODO - figure out how to deal with this better once we add nested 
reading, note also look where this map is used below
    +    // store a map from column name to converted types if they are non-null
    +    Map<String, SchemaElement> schemaElements = 
ParquetReaderUtility.getColNameToSchemaElementMapping(footer);
    +
    +    // loop to add up the length of the fixed width columns and build the 
schema
    +    for (ColumnDescriptor column : 
footer.getFileMetaData().getSchema().getColumns()) {
    +      ParquetColumnMetadata columnMetadata = new 
ParquetColumnMetadata(column);
    +      columnMetadata.resolveDrillType(schemaElements, options);
    +      if (! fieldSelected(columnMetadata.field)) {
    +        continue;
    +      }
    +      selectedColumnMetadata.add(columnMetadata);
    +    }
    +  }
    +
    +  /**
    +   * Fixed-width fields are the easiest to plan. We know the size of each 
column,
    +   * making it easy to determine the total length of each vector, once we 
know
    +   * the target record count. A special reader is used in the fortunate 
case
    +   * that all fields are fixed width.
    +   */
    +
    +  private void computeFixedPart() {
    +    allFieldsFixedLength = true;
    +    for (ParquetColumnMetadata colMd : selectedColumnMetadata) {
    +      if (colMd.isFixedLength()) {
    +        bitWidthAllFixedFields += colMd.length;
    +      } else {
    +        allFieldsFixedLength = false;
    +      }
    +    }
    +  }
    +
    +  public boolean isStarQuery() { return selectedCols == null; }
    +  public ParquetMetadata footer() { return footer; }
    +  public int getBitWidthAllFixedFields() { return bitWidthAllFixedFields; }
    +  public int getRecordsPerBatch() { return recordsPerBatch; }
    +  public boolean allFieldsFixedLength() { return allFieldsFixedLength; }
    +  public List<ParquetColumnMetadata> getColumnMetadata() { return 
selectedColumnMetadata; }
    +
    +  /**
    +   * Return the Parquet file row count.
    +   *
    +   * @return number of records in the Parquet row group
    +   */
    +
    +  public long getGroupRecordCount() { return groupRecordCount; }
    +
    +  public BlockMetaData getRowGroupMetadata() {
    +    return footer.getBlocks().get(rowGroupIndex);
    +  }
    +
    +  /**
    +   * Determine if a Parquet field is selected for the query. It is selected
    +   * either if this is a star query (we want all columns), or the column
    +   * appers in the select list.
    +   *
    +   * @param field the Parquet column expressed as as Drill field.
    +   * @return true if the column is to be included in the scan, false
    +   * if not
    +   */
    +
    +  private boolean fieldSelected(MaterializedField field) {
    +    // TODO - not sure if this is how we want to represent this
    +    // for now it makes the existing tests pass, simply selecting
    +    // all available data if no columns are provided
    +    if (isStarQuery()) {
    +      return true;
    +    }
    +
    +    int i = 0;
    +    for (SchemaPath expr : selectedCols) {
    +      if ( field.getPath().equalsIgnoreCase(expr.getAsUnescapedPath())) {
    +        columnsFound[i] = true;
    +        return true;
    +      }
    +      i++;
    +    }
    +    return false;
    +  }
    +  /**
    --- End diff --
    
    add a new line after end of function


> Refactor Parquet Record Reader
> ------------------------------
>
>                 Key: DRILL-5356
>                 URL: https://issues.apache.org/jira/browse/DRILL-5356
>             Project: Apache Drill
>          Issue Type: Improvement
>    Affects Versions: 1.10.0, 1.11.0
>            Reporter: Paul Rogers
>            Assignee: Paul Rogers
>            Priority: Minor
>             Fix For: 1.11.0
>
>
> The Parquet record reader class is a key part of Drill that has evolved over 
> time to become somewhat hard to follow.
> A number of us are working on Parquet-related tasks and find we have to spend 
> an uncomfortable amount of time trying to understand the code. In particular, 
> this writer needs to figure out how to convince the reader to provide 
> higher-density record batches.
> Rather than continue to decypher the complex code multiple times, this ticket 
> requests to refactor the code to make it functionally identical, but 
> structurally cleaner. The result will be faster time to value when working 
> with this code.
> This is a lower-priority change and will be coordinated with others working 
> on this code base. This ticket is only for the record reader class itself; it 
> does not include the various readers and writers that Parquet uses since 
> another project is actively modifying those classes.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to