[
https://issues.apache.org/jira/browse/DRILL-5356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15954370#comment-15954370
]
ASF GitHub Bot commented on DRILL-5356:
---------------------------------------
Github user paul-rogers commented on a diff in the pull request:
https://github.com/apache/drill/pull/789#discussion_r109519725
--- Diff:
exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ReadState.java
---
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet.columnreaders;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.parquet.ParquetReaderStats;
+import org.apache.drill.exec.vector.NullableIntVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+
+/**
+ * Internal state for reading from a Parquet file.
+ */
+
+public class ReadState {
+ private final ParquetSchema schema;
+ ParquetReaderStats parquetReaderStats;
+ private VarLenBinaryReader varLengthReader;
+ // For columns not found in the file, we need to return a schema element
with the correct number of values
+ // at that position in the schema. Currently this requires a vector be
present. Here is a list of all of these vectors
+ // that need only have their value count set at the end of each call to
next(), as the values default to null.
+ private List<NullableIntVector> nullFilledVectors;
+ // Keeps track of the number of records returned in the case where only
columns outside of the file were selected.
+ // No actual data needs to be read out of the file, we only need to
return batches until we have 'read' the number of
+ // records specified in the row group metadata
+ long mockRecordsRead;
+ private List<ColumnReader<?>> columnStatuses = new ArrayList<>();
+ private long numRecordsToRead; // number of records to read
+ private long totalRecordsRead;
+ boolean useAsyncColReader;
+
+ public ReadState(ParquetSchema schema, ParquetReaderStats
parquetReaderStats, long numRecordsToRead, boolean useAsyncColReader) {
+ this.schema = schema;
+ this.parquetReaderStats = parquetReaderStats;
+ this.useAsyncColReader = useAsyncColReader;
+ if (! schema.isStarQuery()) {
+ nullFilledVectors = new ArrayList<>();
+ }
+ mockRecordsRead = 0;
+ // Callers can pass -1 if they want to read all rows.
+ if (numRecordsToRead ==
ParquetRecordReader.NUM_RECORDS_TO_READ_NOT_SPECIFIED) {
+ this.numRecordsToRead = schema.rowCount();
+ } else {
+ assert (numRecordsToRead >= 0);
+ this.numRecordsToRead = Math.min(numRecordsToRead,
schema.rowCount());
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public void buildReader(ParquetRecordReader reader, OutputMutator
output) throws Exception {
+ final ArrayList<VarLengthColumn<? extends ValueVector>>
varLengthColumns = new ArrayList<>();
+ // initialize all of the column read status objects
+ BlockMetaData rowGroupMetadata = schema.getRowGroupMetadata();
+ Map<String, Integer> columnChunkMetadataPositionsInList =
schema.buildChunkMap(rowGroupMetadata);
+ for (ParquetColumnMetadata colMd : schema.getColumnMetadata()) {
+ ColumnDescriptor column = colMd.column;
+ colMd.columnChunkMetaData = rowGroupMetadata.getColumns().get(
+
columnChunkMetadataPositionsInList.get(Arrays.toString(column.getPath())));
+ colMd.buildVector(output);
+ if (! colMd.isFixedLength( )) {
+ // create a reader and add it to the appropriate list
+ varLengthColumns.add(colMd.makeVariableWidthReader(reader));
+ } else if (colMd.isRepeated()) {
+ varLengthColumns.add(colMd.makeRepeatedFixedWidthReader(reader,
schema.getRecordsPerBatch()));
+ }
+ else {
+ columnStatuses.add(colMd.makeFixedWidthReader(reader,
schema.getRecordsPerBatch()));
+ }
+ }
+ varLengthReader = new VarLenBinaryReader(reader, varLengthColumns);
+ if (! schema.isStarQuery()) {
+ schema.createNonExistentColumns(output, nullFilledVectors);
+ }
+ }
+
+ public ColumnReader<?> getFirstColumnStatus() {
+ if (columnStatuses.size() > 0) {
+ return columnStatuses.get(0);
+ }
+ else if (varLengthReader.columns.size() > 0) {
+ return varLengthReader.columns.get(0);
+ } else {
+ return null;
+ }
+ }
+
+ public void resetBatch() {
+ for (final ColumnReader<?> column : columnStatuses) {
+ column.valuesReadInCurrentPass = 0;
+ }
+ for (final VarLengthColumn<?> r : varLengthReader.columns) {
+ r.valuesReadInCurrentPass = 0;
+ }
+ }
+
+ public ParquetSchema schema() { return schema; }
+ public List<ColumnReader<?>> getReaders() { return columnStatuses; }
--- End diff --
Fixed. I also renamed "columnStatuses" since I could never figure out what
that meant. Now is "columnReaders".
> Refactor Parquet Record Reader
> ------------------------------
>
> Key: DRILL-5356
> URL: https://issues.apache.org/jira/browse/DRILL-5356
> Project: Apache Drill
> Issue Type: Improvement
> Affects Versions: 1.10.0, 1.11.0
> Reporter: Paul Rogers
> Assignee: Paul Rogers
> Priority: Minor
> Fix For: 1.11.0
>
>
> The Parquet record reader class is a key part of Drill that has evolved over
> time to become somewhat hard to follow.
> A number of us are working on Parquet-related tasks and find we have to spend
> an uncomfortable amount of time trying to understand the code. In particular,
> this writer needs to figure out how to convince the reader to provide
> higher-density record batches.
> Rather than continue to decypher the complex code multiple times, this ticket
> requests to refactor the code to make it functionally identical, but
> structurally cleaner. The result will be faster time to value when working
> with this code.
> This is a lower-priority change and will be coordinated with others working
> on this code base. This ticket is only for the record reader class itself; it
> does not include the various readers and writers that Parquet uses since
> another project is actively modifying those classes.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)