rdblue commented on a change in pull request #723: Arrow changes for supporting vectorized reads URL: https://github.com/apache/incubator-iceberg/pull/723#discussion_r387370583
########## File path: arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java ########## @@ -0,0 +1,471 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iceberg.arrow.vectorized.parquet; + +import java.io.IOException; +import org.apache.arrow.vector.DecimalVector; +import org.apache.arrow.vector.FieldVector; +import org.apache.arrow.vector.IntVector; +import org.apache.arrow.vector.VarBinaryVector; +import org.apache.iceberg.arrow.vectorized.NullabilityHolder; +import org.apache.iceberg.parquet.BasePageIterator; +import org.apache.iceberg.parquet.ParquetUtil; +import org.apache.iceberg.parquet.ValuesAsBytesReader; +import org.apache.parquet.CorruptDeltaByteArrays; +import org.apache.parquet.bytes.ByteBufferInputStream; +import org.apache.parquet.bytes.BytesUtils; +import org.apache.parquet.column.ColumnDescriptor; +import org.apache.parquet.column.Encoding; +import org.apache.parquet.column.page.DataPageV1; +import org.apache.parquet.column.page.DataPageV2; +import org.apache.parquet.column.values.RequiresPreviousReader; +import org.apache.parquet.column.values.ValuesReader; +import org.apache.parquet.io.ParquetDecodingException; + +public class VectorizedPageIterator extends BasePageIterator { + private final boolean setArrowValidityVector; + + public VectorizedPageIterator(ColumnDescriptor desc, String writerVersion, boolean setValidityVector) { + super(desc, writerVersion); + this.setArrowValidityVector = setValidityVector; + } + + private boolean eagerDecodeDictionary; + private ValuesAsBytesReader plainValuesReader = null; + private VectorizedDictionaryEncodedParquetValuesReader dictionaryEncodedValuesReader = null; + private boolean allPagesDictEncoded; + private VectorizedParquetDefinitionLevelReader vectorizedDefinitionLevelReader; + + public void setAllPagesDictEncoded(boolean allDictEncoded) { + this.allPagesDictEncoded = allDictEncoded; + } + + @Override + protected void reset() { + super.reset(); + this.plainValuesReader = null; + this.vectorizedDefinitionLevelReader = null; + } + + /** + * Method for reading a batch of dictionary ids from the dicitonary encoded data pages. Like definition levels, + * dictionary ids in Parquet are RLE/bin-packed encoded as well. + */ + public int nextBatchDictionaryIds( + final IntVector vector, final int expectedBatchSize, + final int numValsInVector, + NullabilityHolder holder) { + final int actualBatchSize = getActualBatchSize(expectedBatchSize); + if (actualBatchSize <= 0) { + return 0; + } + vectorizedDefinitionLevelReader.readBatchOfDictionaryIds( + vector, + numValsInVector, + actualBatchSize, + holder, + dictionaryEncodedValuesReader); + triplesRead += actualBatchSize; + this.hasNext = triplesRead < triplesCount; + return actualBatchSize; + } + + /** + * Method for reading a batch of values of INT32 data type + */ + public int nextBatchIntegers( + final FieldVector vector, final int expectedBatchSize, + final int numValsInVector, + final int typeWidth, NullabilityHolder holder) { + final int actualBatchSize = getActualBatchSize(expectedBatchSize); + if (actualBatchSize <= 0) { + return 0; + } + if (eagerDecodeDictionary) { + vectorizedDefinitionLevelReader.readBatchOfDictionaryEncodedIntegers( + vector, + numValsInVector, + typeWidth, + actualBatchSize, + holder, + dictionaryEncodedValuesReader, + dictionary); + } else { + vectorizedDefinitionLevelReader.readBatchOfIntegers( + vector, + numValsInVector, + typeWidth, + actualBatchSize, + holder, + plainValuesReader); + } + triplesRead += actualBatchSize; + this.hasNext = triplesRead < triplesCount; + return actualBatchSize; + } + + /** + * Method for reading a batch of values of INT64 data type + */ + public int nextBatchLongs( + final FieldVector vector, final int expectedBatchSize, + final int numValsInVector, + final int typeWidth, NullabilityHolder holder) { + final int actualBatchSize = getActualBatchSize(expectedBatchSize); + if (actualBatchSize <= 0) { + return 0; + } + if (eagerDecodeDictionary) { + vectorizedDefinitionLevelReader.readBatchOfDictionaryEncodedLongs( + vector, + numValsInVector, + typeWidth, + actualBatchSize, + holder, + dictionaryEncodedValuesReader, + dictionary); + } else { + vectorizedDefinitionLevelReader.readBatchOfLongs( + vector, + numValsInVector, + typeWidth, + actualBatchSize, + holder, + plainValuesReader); + } + triplesRead += actualBatchSize; + this.hasNext = triplesRead < triplesCount; + return actualBatchSize; + } + + /** + * Method for reading a batch of values of TIMESTAMP_MILLIS data type. In iceberg, TIMESTAMP + * is always represented in micro-seconds. So we multiply values stored in millis with 1000 + * before writing them to the vector. + */ + public int nextBatchTimestampMillis( + final FieldVector vector, final int expectedBatchSize, + final int numValsInVector, + final int typeWidth, NullabilityHolder holder) { + final int actualBatchSize = getActualBatchSize(expectedBatchSize); + if (actualBatchSize <= 0) { + return 0; + } + if (eagerDecodeDictionary) { + vectorizedDefinitionLevelReader.readBatchOfDictionaryEncodedTimestampMillis( + vector, + numValsInVector, + typeWidth, + actualBatchSize, + holder, + dictionaryEncodedValuesReader, + dictionary); + } else { + vectorizedDefinitionLevelReader.readBatchOfTimestampMillis( + vector, + numValsInVector, + typeWidth, + actualBatchSize, + holder, + plainValuesReader); + } + triplesRead += actualBatchSize; + this.hasNext = triplesRead < triplesCount; + return actualBatchSize; + } + + /** + * Method for reading a batch of values of FLOAT data type. + */ + public int nextBatchFloats( + final FieldVector vector, final int expectedBatchSize, + final int numValsInVector, + final int typeWidth, NullabilityHolder holder) { + final int actualBatchSize = getActualBatchSize(expectedBatchSize); + if (actualBatchSize <= 0) { + return 0; + } + if (eagerDecodeDictionary) { + vectorizedDefinitionLevelReader.readBatchOfDictionaryEncodedFloats( + vector, + numValsInVector, + typeWidth, + actualBatchSize, + holder, + dictionaryEncodedValuesReader, + dictionary); + } else { + vectorizedDefinitionLevelReader.readBatchOfFloats( + vector, + numValsInVector, + typeWidth, + actualBatchSize, + holder, + plainValuesReader); + } + triplesRead += actualBatchSize; + this.hasNext = triplesRead < triplesCount; + return actualBatchSize; + } + + /** + * Method for reading a batch of values of DOUBLE data type + */ + public int nextBatchDoubles( + final FieldVector vector, final int expectedBatchSize, + final int numValsInVector, + final int typeWidth, NullabilityHolder holder) { + final int actualBatchSize = getActualBatchSize(expectedBatchSize); + if (actualBatchSize <= 0) { + return 0; + } + if (eagerDecodeDictionary) { + vectorizedDefinitionLevelReader.readBatchOfDictionaryEncodedDoubles( + vector, + numValsInVector, + typeWidth, + actualBatchSize, + holder, + dictionaryEncodedValuesReader, + dictionary); + } else { + vectorizedDefinitionLevelReader.readBatchOfDoubles( + vector, + numValsInVector, + typeWidth, + actualBatchSize, + holder, + plainValuesReader); + } + triplesRead += actualBatchSize; + this.hasNext = triplesRead < triplesCount; + return actualBatchSize; + } + + private int getActualBatchSize(int expectedBatchSize) { + return Math.min(expectedBatchSize, triplesCount - triplesRead); + } + + /** + * Method for reading a batch of decimals backed by INT32 and INT64 parquet data types. Since Arrow stores all + * decimals in 16 bytes, byte arrays are appropriately padded before being written to Arrow data buffers. + */ + public int nextBatchIntLongBackedDecimal( + final FieldVector vector, final int expectedBatchSize, final int numValsInVector, + final int typeWidth, NullabilityHolder nullabilityHolder) { + final int actualBatchSize = getActualBatchSize(expectedBatchSize); + if (actualBatchSize <= 0) { + return 0; + } + if (eagerDecodeDictionary) { + vectorizedDefinitionLevelReader + .readBatchOfDictionaryEncodedIntLongBackedDecimals( + vector, + numValsInVector, + typeWidth, + actualBatchSize, + nullabilityHolder, + dictionaryEncodedValuesReader, + dictionary); + } else { + vectorizedDefinitionLevelReader.readBatchOfIntLongBackedDecimals( + vector, + numValsInVector, + typeWidth, + actualBatchSize, + nullabilityHolder, + plainValuesReader); + } + triplesRead += actualBatchSize; + this.hasNext = triplesRead < triplesCount; + return actualBatchSize; + } + + /** + * Method for reading a batch of decimals backed by fixed length byte array parquet data type. Arrow stores all + * decimals in 16 bytes. This method provides the necessary padding to the decimals read. Moreover, Arrow interprets + * the decimals in Arrow buffer as little endian. Parquet stores fixed length decimals as big endian. So, this method + * uses {@link DecimalVector#setBigEndian(int, byte[])} method so that the data in Arrow vector is indeed little + * endian. + */ + public int nextBatchFixedLengthDecimal( + final FieldVector vector, final int expectedBatchSize, final int numValsInVector, + final int typeWidth, NullabilityHolder nullabilityHolder) { + final int actualBatchSize = getActualBatchSize(expectedBatchSize); + if (actualBatchSize <= 0) { + return 0; + } + if (eagerDecodeDictionary) { + vectorizedDefinitionLevelReader.readBatchOfDictionaryEncodedFixedLengthDecimals( + vector, + numValsInVector, + typeWidth, + actualBatchSize, + nullabilityHolder, + dictionaryEncodedValuesReader, + dictionary); + } else { + vectorizedDefinitionLevelReader.readBatchOfFixedLengthDecimals( + vector, + numValsInVector, + typeWidth, + actualBatchSize, + nullabilityHolder, + plainValuesReader); + } + triplesRead += actualBatchSize; + this.hasNext = triplesRead < triplesCount; + return actualBatchSize; + } + + /** + * Method for reading a batch of variable width data type (ENUM, JSON, UTF8, BSON). + */ + public int nextBatchVarWidthType( + final FieldVector vector, + final int expectedBatchSize, + final int numValsInVector, + NullabilityHolder nullabilityHolder) { + final int actualBatchSize = getActualBatchSize(expectedBatchSize); + if (actualBatchSize <= 0) { + return 0; + } + if (eagerDecodeDictionary) { + vectorizedDefinitionLevelReader.readBatchOfDictionaryEncodedVarWidth( + vector, + numValsInVector, + actualBatchSize, + nullabilityHolder, + dictionaryEncodedValuesReader, + dictionary); + } else { + vectorizedDefinitionLevelReader.readBatchVarWidth( + vector, + numValsInVector, + actualBatchSize, + nullabilityHolder, + plainValuesReader); + } + triplesRead += actualBatchSize; + this.hasNext = triplesRead < triplesCount; + return actualBatchSize; + } + + /** + * Method for reading batches of fixed width binary type (e.g. BYTE[7]). Spark does not support fixed width binary + * data type. To work around this limitation, the data is read as fixed width binary from parquet and stored in a + * {@link VarBinaryVector} in Arrow. + */ + public int nextBatchFixedWidthBinary( + final FieldVector vector, final int expectedBatchSize, final int numValsInVector, + final int typeWidth, NullabilityHolder nullabilityHolder) { + final int actualBatchSize = getActualBatchSize(expectedBatchSize); + if (actualBatchSize <= 0) { + return 0; + } + if (eagerDecodeDictionary) { + vectorizedDefinitionLevelReader.readBatchOfDictionaryEncodedFixedWidthBinary( + vector, + numValsInVector, + typeWidth, + actualBatchSize, + nullabilityHolder, + dictionaryEncodedValuesReader, + dictionary); + } else { + vectorizedDefinitionLevelReader.readBatchOfFixedWidthBinary( + vector, + numValsInVector, + typeWidth, + actualBatchSize, + nullabilityHolder, + plainValuesReader); + } + triplesRead += actualBatchSize; + this.hasNext = triplesRead < triplesCount; + return actualBatchSize; + } + + /** + * Method for reading batches of booleans. + */ + public int nextBatchBoolean( + final FieldVector vector, + final int expectedBatchSize, + final int numValsInVector, + NullabilityHolder nullabilityHolder) { + final int actualBatchSize = getActualBatchSize(expectedBatchSize); + if (actualBatchSize <= 0) { + return 0; + } + vectorizedDefinitionLevelReader + .readBatchOfBooleans(vector, numValsInVector, actualBatchSize, + nullabilityHolder, plainValuesReader); + triplesRead += actualBatchSize; + this.hasNext = triplesRead < triplesCount; + return actualBatchSize; + } + + @Override + protected void initDataReader(Encoding dataEncoding, ByteBufferInputStream in, int valueCount) { + ValuesReader previousReader = plainValuesReader; + this.eagerDecodeDictionary = dataEncoding.usesDictionary() && dictionary != null && + (ParquetUtil.isIntType(desc.getPrimitiveType()) || !allPagesDictEncoded); Review comment: @samarthjain, in a follow-up, let's update the logic here to something other than just eagerly decoding integers. We probably want to do this for 4-byte floats, for example. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org For additional commands, e-mail: issues-h...@iceberg.apache.org