[
https://issues.apache.org/jira/browse/DRILL-5356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16025437#comment-16025437
]
ASF GitHub Bot commented on DRILL-5356:
---------------------------------------
Github user paul-rogers commented on a diff in the pull request:
https://github.com/apache/drill/pull/789#discussion_r118591297
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetSchema.java
---
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.expr.TypeHelper;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.server.options.OptionManager;
+import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
+import org.apache.drill.exec.vector.NullableIntVector;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.format.SchemaElement;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Mapping from the schema of the Parquet file to that of the record reader
+ * to the schema that Drill and the Parquet reader uses.
+ */
+
+public class ParquetSchema {
+ /**
+ * Set of columns specified in the SELECT clause. Will be null for
+ * a SELECT * query.
+ */
+ private final Collection<SchemaPath> selectedCols;
+ /**
+ * Parallel list to the columns list above, it is used to determine the
subset of the project
+ * pushdown columns that do not appear in this file.
+ */
+ private final boolean[] columnsFound;
+ private final OptionManager options;
+ private final int rowGroupIndex;
+ private ParquetMetadata footer;
+ /**
+ * List of metadata for selected columns. This list does two things.
+ * First, it identifies the Parquet columns we wish to select. Second, it
+ * provides metadata for those columns. Note that null columns (columns
+ * in the SELECT clause but not in the file) appear elsewhere.
+ */
+ private List<ParquetColumnMetadata> selectedColumnMetadata = new
ArrayList<>();
+ private int bitWidthAllFixedFields;
+ private boolean allFieldsFixedLength;
+ private long groupRecordCount;
+ private int recordsPerBatch;
+
+ /**
+ * Build the Parquet schema. The schema can be based on a "SELECT *",
+ * meaning we want all columns defined in the Parquet file. In this case,
+ * the list of selected columns is null. Or, the query can be based on
+ * an explicit list of selected columns. In this case, the
+ * columns need not exist in the Parquet file. If a column does not
exist,
+ * the reader returns null for that column. If no selected column exists
+ * in the file, then we return "mock" records: records with only null
+ * values, but repeated for the number of rows in the Parquet file.
+ *
+ * @param options session options
+ * @param rowGroupIndex row group to read
+ * @param selectedCols columns specified in the SELECT clause, or null if
+ * this is a SELECT * query
+ */
+
+ public ParquetSchema(OptionManager options, int rowGroupIndex,
Collection<SchemaPath> selectedCols) {
+ this.options = options;
+ this.rowGroupIndex = rowGroupIndex;
+ this.selectedCols = selectedCols;
+ if (selectedCols == null) {
+ columnsFound = null;
+ } else {
+ columnsFound = new boolean[selectedCols.size()];
+ }
+ }
+
+ /**
+ * Build the schema for this read as a combination of the schema
specified in
+ * the Parquet footer and the list of columns selected in the query.
+ *
+ * @param footer Parquet metadata
+ * @param batchSize target size of the batch, in rows
+ * @throws Exception if anything goes wrong
+ */
+
+ public void buildSchema(ParquetMetadata footer, long batchSize) throws
Exception {
+ this.footer = footer;
+ groupRecordCount = footer.getBlocks().get(rowGroupIndex).getRowCount();
+ loadParquetSchema();
+ computeFixedPart();
+
+ if (! selectedColumnMetadata.isEmpty() && allFieldsFixedLength) {
+ recordsPerBatch = (int) Math.min(Math.min(batchSize /
bitWidthAllFixedFields,
+ footer.getBlocks().get(0).getColumns().get(0).getValueCount()),
ParquetRecordReader.DEFAULT_RECORDS_TO_READ_IF_FIXED_WIDTH);
+ }
+ else {
+ recordsPerBatch =
ParquetRecordReader.DEFAULT_RECORDS_TO_READ_IF_VARIABLE_WIDTH;
+ }
+ }
+
+ /**
+ * Scan the Parquet footer, then map each Parquet column to the list of
columns
+ * we want to read. Track those to be read.
+ */
+
+ private void loadParquetSchema() {
+ // TODO - figure out how to deal with this better once we add nested
reading, note also look where this map is used below
+ // store a map from column name to converted types if they are non-null
+ Map<String, SchemaElement> schemaElements =
ParquetReaderUtility.getColNameToSchemaElementMapping(footer);
+
+ // loop to add up the length of the fixed width columns and build the
schema
+ for (ColumnDescriptor column :
footer.getFileMetaData().getSchema().getColumns()) {
+ ParquetColumnMetadata columnMetadata = new
ParquetColumnMetadata(column);
+ columnMetadata.resolveDrillType(schemaElements, options);
+ if (! fieldSelected(columnMetadata.field)) {
+ continue;
+ }
+ selectedColumnMetadata.add(columnMetadata);
+ }
+ }
+
+ /**
+ * Fixed-width fields are the easiest to plan. We know the size of each
column,
+ * making it easy to determine the total length of each vector, once we
know
+ * the target record count. A special reader is used in the fortunate
case
+ * that all fields are fixed width.
+ */
+
+ private void computeFixedPart() {
+ allFieldsFixedLength = true;
+ for (ParquetColumnMetadata colMd : selectedColumnMetadata) {
+ if (colMd.isFixedLength()) {
+ bitWidthAllFixedFields += colMd.length;
+ } else {
+ allFieldsFixedLength = false;
+ }
+ }
+ }
+
+ public boolean isStarQuery() { return selectedCols == null; }
+ public ParquetMetadata footer() { return footer; }
+ public int getBitWidthAllFixedFields() { return bitWidthAllFixedFields; }
+ public int getRecordsPerBatch() { return recordsPerBatch; }
+ public boolean allFieldsFixedLength() { return allFieldsFixedLength; }
+ public List<ParquetColumnMetadata> getColumnMetadata() { return
selectedColumnMetadata; }
+
+ /**
+ * Return the Parquet file row count.
+ *
+ * @return number of records in the Parquet row group
+ */
+
+ public long getGroupRecordCount() { return groupRecordCount; }
+
+ public BlockMetaData getRowGroupMetadata() {
+ return footer.getBlocks().get(rowGroupIndex);
+ }
+
+ /**
+ * Determine if a Parquet field is selected for the query. It is selected
+ * either if this is a star query (we want all columns), or the column
+ * appers in the select list.
+ *
+ * @param field the Parquet column expressed as as Drill field.
+ * @return true if the column is to be included in the scan, false
+ * if not
+ */
+
+ private boolean fieldSelected(MaterializedField field) {
+ // TODO - not sure if this is how we want to represent this
+ // for now it makes the existing tests pass, simply selecting
+ // all available data if no columns are provided
+ if (isStarQuery()) {
+ return true;
+ }
+
+ int i = 0;
+ for (SchemaPath expr : selectedCols) {
+ if ( field.getPath().equalsIgnoreCase(expr.getAsUnescapedPath())) {
+ columnsFound[i] = true;
+ return true;
+ }
+ i++;
+ }
+ return false;
+ }
+ /**
--- End diff --
Fixed.
> Refactor Parquet Record Reader
> ------------------------------
>
> Key: DRILL-5356
> URL: https://issues.apache.org/jira/browse/DRILL-5356
> Project: Apache Drill
> Issue Type: Improvement
> Affects Versions: 1.10.0, 1.11.0
> Reporter: Paul Rogers
> Assignee: Paul Rogers
> Priority: Minor
> Labels: ready-to-commit
> Fix For: 1.11.0
>
>
> The Parquet record reader class is a key part of Drill that has evolved over
> time to become somewhat hard to follow.
> A number of us are working on Parquet-related tasks and find we have to spend
> an uncomfortable amount of time trying to understand the code. In particular,
> this writer needs to figure out how to convince the reader to provide
> higher-density record batches.
> Rather than continue to decypher the complex code multiple times, this ticket
> requests to refactor the code to make it functionally identical, but
> structurally cleaner. The result will be faster time to value when working
> with this code.
> This is a lower-priority change and will be coordinated with others working
> on this code base. This ticket is only for the record reader class itself; it
> does not include the various readers and writers that Parquet uses since
> another project is actively modifying those classes.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)