rdblue commented on a change in pull request #723: Arrow changes for supporting 
vectorized reads
URL: https://github.com/apache/incubator-iceberg/pull/723#discussion_r387370583
 
 

 ##########
 File path: 
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java
 ##########
 @@ -0,0 +1,471 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.arrow.vectorized.parquet;
+
+import java.io.IOException;
+import org.apache.arrow.vector.DecimalVector;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.VarBinaryVector;
+import org.apache.iceberg.arrow.vectorized.NullabilityHolder;
+import org.apache.iceberg.parquet.BasePageIterator;
+import org.apache.iceberg.parquet.ParquetUtil;
+import org.apache.iceberg.parquet.ValuesAsBytesReader;
+import org.apache.parquet.CorruptDeltaByteArrays;
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.values.RequiresPreviousReader;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.io.ParquetDecodingException;
+
+public class VectorizedPageIterator extends BasePageIterator {
+  private final boolean setArrowValidityVector;
+
+  public VectorizedPageIterator(ColumnDescriptor desc, String writerVersion, 
boolean setValidityVector) {
+    super(desc, writerVersion);
+    this.setArrowValidityVector = setValidityVector;
+  }
+
+  private boolean eagerDecodeDictionary;
+  private ValuesAsBytesReader plainValuesReader = null;
+  private VectorizedDictionaryEncodedParquetValuesReader 
dictionaryEncodedValuesReader = null;
+  private boolean allPagesDictEncoded;
+  private VectorizedParquetDefinitionLevelReader 
vectorizedDefinitionLevelReader;
+
+  public void setAllPagesDictEncoded(boolean allDictEncoded) {
+    this.allPagesDictEncoded = allDictEncoded;
+  }
+
+  @Override
+  protected void reset() {
+    super.reset();
+    this.plainValuesReader = null;
+    this.vectorizedDefinitionLevelReader = null;
+  }
+
+  /**
+   * Method for reading a batch of dictionary ids from the dicitonary encoded 
data pages. Like definition levels,
+   * dictionary ids in Parquet are RLE/bin-packed encoded as well.
+   */
+  public int nextBatchDictionaryIds(
+      final IntVector vector, final int expectedBatchSize,
+      final int numValsInVector,
+      NullabilityHolder holder) {
+    final int actualBatchSize = getActualBatchSize(expectedBatchSize);
+    if (actualBatchSize <= 0) {
+      return 0;
+    }
+    vectorizedDefinitionLevelReader.readBatchOfDictionaryIds(
+        vector,
+        numValsInVector,
+        actualBatchSize,
+        holder,
+        dictionaryEncodedValuesReader);
+    triplesRead += actualBatchSize;
+    this.hasNext = triplesRead < triplesCount;
+    return actualBatchSize;
+  }
+
+  /**
+   * Method for reading a batch of values of INT32 data type
+   */
+  public int nextBatchIntegers(
+      final FieldVector vector, final int expectedBatchSize,
+      final int numValsInVector,
+      final int typeWidth, NullabilityHolder holder) {
+    final int actualBatchSize = getActualBatchSize(expectedBatchSize);
+    if (actualBatchSize <= 0) {
+      return 0;
+    }
+    if (eagerDecodeDictionary) {
+      vectorizedDefinitionLevelReader.readBatchOfDictionaryEncodedIntegers(
+          vector,
+          numValsInVector,
+          typeWidth,
+          actualBatchSize,
+          holder,
+          dictionaryEncodedValuesReader,
+          dictionary);
+    } else {
+      vectorizedDefinitionLevelReader.readBatchOfIntegers(
+          vector,
+          numValsInVector,
+          typeWidth,
+          actualBatchSize,
+          holder,
+          plainValuesReader);
+    }
+    triplesRead += actualBatchSize;
+    this.hasNext = triplesRead < triplesCount;
+    return actualBatchSize;
+  }
+
+  /**
+   * Method for reading a batch of values of INT64 data type
+   */
+  public int nextBatchLongs(
+      final FieldVector vector, final int expectedBatchSize,
+      final int numValsInVector,
+      final int typeWidth, NullabilityHolder holder) {
+    final int actualBatchSize = getActualBatchSize(expectedBatchSize);
+    if (actualBatchSize <= 0) {
+      return 0;
+    }
+    if (eagerDecodeDictionary) {
+      vectorizedDefinitionLevelReader.readBatchOfDictionaryEncodedLongs(
+          vector,
+          numValsInVector,
+          typeWidth,
+          actualBatchSize,
+          holder,
+          dictionaryEncodedValuesReader,
+          dictionary);
+    } else {
+      vectorizedDefinitionLevelReader.readBatchOfLongs(
+          vector,
+          numValsInVector,
+          typeWidth,
+          actualBatchSize,
+          holder,
+          plainValuesReader);
+    }
+    triplesRead += actualBatchSize;
+    this.hasNext = triplesRead < triplesCount;
+    return actualBatchSize;
+  }
+
+  /**
+   * Method for reading a batch of values of TIMESTAMP_MILLIS data type. In 
iceberg, TIMESTAMP
+   * is always represented in micro-seconds. So we multiply values stored in 
millis with 1000
+   * before writing them to the vector.
+   */
+  public int nextBatchTimestampMillis(
+      final FieldVector vector, final int expectedBatchSize,
+      final int numValsInVector,
+      final int typeWidth, NullabilityHolder holder) {
+    final int actualBatchSize = getActualBatchSize(expectedBatchSize);
+    if (actualBatchSize <= 0) {
+      return 0;
+    }
+    if (eagerDecodeDictionary) {
+      
vectorizedDefinitionLevelReader.readBatchOfDictionaryEncodedTimestampMillis(
+          vector,
+          numValsInVector,
+          typeWidth,
+          actualBatchSize,
+          holder,
+          dictionaryEncodedValuesReader,
+          dictionary);
+    } else {
+      vectorizedDefinitionLevelReader.readBatchOfTimestampMillis(
+          vector,
+          numValsInVector,
+          typeWidth,
+          actualBatchSize,
+          holder,
+          plainValuesReader);
+    }
+    triplesRead += actualBatchSize;
+    this.hasNext = triplesRead < triplesCount;
+    return actualBatchSize;
+  }
+
+  /**
+   * Method for reading a batch of values of FLOAT data type.
+   */
+  public int nextBatchFloats(
+      final FieldVector vector, final int expectedBatchSize,
+      final int numValsInVector,
+      final int typeWidth, NullabilityHolder holder) {
+    final int actualBatchSize = getActualBatchSize(expectedBatchSize);
+    if (actualBatchSize <= 0) {
+      return 0;
+    }
+    if (eagerDecodeDictionary) {
+      vectorizedDefinitionLevelReader.readBatchOfDictionaryEncodedFloats(
+          vector,
+          numValsInVector,
+          typeWidth,
+          actualBatchSize,
+          holder,
+          dictionaryEncodedValuesReader,
+          dictionary);
+    } else {
+      vectorizedDefinitionLevelReader.readBatchOfFloats(
+          vector,
+          numValsInVector,
+          typeWidth,
+          actualBatchSize,
+          holder,
+          plainValuesReader);
+    }
+    triplesRead += actualBatchSize;
+    this.hasNext = triplesRead < triplesCount;
+    return actualBatchSize;
+  }
+
+  /**
+   * Method for reading a batch of values of DOUBLE data type
+   */
+  public int nextBatchDoubles(
+      final FieldVector vector, final int expectedBatchSize,
+      final int numValsInVector,
+      final int typeWidth, NullabilityHolder holder) {
+    final int actualBatchSize = getActualBatchSize(expectedBatchSize);
+    if (actualBatchSize <= 0) {
+      return 0;
+    }
+    if (eagerDecodeDictionary) {
+      vectorizedDefinitionLevelReader.readBatchOfDictionaryEncodedDoubles(
+          vector,
+          numValsInVector,
+          typeWidth,
+          actualBatchSize,
+          holder,
+          dictionaryEncodedValuesReader,
+          dictionary);
+    } else {
+      vectorizedDefinitionLevelReader.readBatchOfDoubles(
+          vector,
+          numValsInVector,
+          typeWidth,
+          actualBatchSize,
+          holder,
+          plainValuesReader);
+    }
+    triplesRead += actualBatchSize;
+    this.hasNext = triplesRead < triplesCount;
+    return actualBatchSize;
+  }
+
+  private int getActualBatchSize(int expectedBatchSize) {
+    return Math.min(expectedBatchSize, triplesCount - triplesRead);
+  }
+
+  /**
+   * Method for reading a batch of decimals backed by INT32 and INT64 parquet 
data types. Since Arrow stores all
+   * decimals in 16 bytes, byte arrays are appropriately padded before being 
written to Arrow data buffers.
+   */
+  public int nextBatchIntLongBackedDecimal(
+      final FieldVector vector, final int expectedBatchSize, final int 
numValsInVector,
+      final int typeWidth, NullabilityHolder nullabilityHolder) {
+    final int actualBatchSize = getActualBatchSize(expectedBatchSize);
+    if (actualBatchSize <= 0) {
+      return 0;
+    }
+    if (eagerDecodeDictionary) {
+      vectorizedDefinitionLevelReader
+          .readBatchOfDictionaryEncodedIntLongBackedDecimals(
+              vector,
+              numValsInVector,
+              typeWidth,
+              actualBatchSize,
+              nullabilityHolder,
+              dictionaryEncodedValuesReader,
+              dictionary);
+    } else {
+      vectorizedDefinitionLevelReader.readBatchOfIntLongBackedDecimals(
+          vector,
+          numValsInVector,
+          typeWidth,
+          actualBatchSize,
+          nullabilityHolder,
+          plainValuesReader);
+    }
+    triplesRead += actualBatchSize;
+    this.hasNext = triplesRead < triplesCount;
+    return actualBatchSize;
+  }
+
+  /**
+   * Method for reading a batch of decimals backed by fixed length byte array 
parquet data type. Arrow stores all
+   * decimals in 16 bytes. This method provides the necessary padding to the 
decimals read. Moreover, Arrow interprets
+   * the decimals in Arrow buffer as little endian. Parquet stores fixed 
length decimals as big endian. So, this method
+   * uses {@link DecimalVector#setBigEndian(int, byte[])} method so that the 
data in Arrow vector is indeed little
+   * endian.
+   */
+  public int nextBatchFixedLengthDecimal(
+      final FieldVector vector, final int expectedBatchSize, final int 
numValsInVector,
+      final int typeWidth, NullabilityHolder nullabilityHolder) {
+    final int actualBatchSize = getActualBatchSize(expectedBatchSize);
+    if (actualBatchSize <= 0) {
+      return 0;
+    }
+    if (eagerDecodeDictionary) {
+      
vectorizedDefinitionLevelReader.readBatchOfDictionaryEncodedFixedLengthDecimals(
+          vector,
+          numValsInVector,
+          typeWidth,
+          actualBatchSize,
+          nullabilityHolder,
+          dictionaryEncodedValuesReader,
+          dictionary);
+    } else {
+      vectorizedDefinitionLevelReader.readBatchOfFixedLengthDecimals(
+          vector,
+          numValsInVector,
+          typeWidth,
+          actualBatchSize,
+          nullabilityHolder,
+          plainValuesReader);
+    }
+    triplesRead += actualBatchSize;
+    this.hasNext = triplesRead < triplesCount;
+    return actualBatchSize;
+  }
+
+  /**
+   * Method for reading a batch of variable width data type (ENUM, JSON, UTF8, 
BSON).
+   */
+  public int nextBatchVarWidthType(
+      final FieldVector vector,
+      final int expectedBatchSize,
+      final int numValsInVector,
+      NullabilityHolder nullabilityHolder) {
+    final int actualBatchSize = getActualBatchSize(expectedBatchSize);
+    if (actualBatchSize <= 0) {
+      return 0;
+    }
+    if (eagerDecodeDictionary) {
+      vectorizedDefinitionLevelReader.readBatchOfDictionaryEncodedVarWidth(
+          vector,
+          numValsInVector,
+          actualBatchSize,
+          nullabilityHolder,
+          dictionaryEncodedValuesReader,
+          dictionary);
+    } else {
+      vectorizedDefinitionLevelReader.readBatchVarWidth(
+          vector,
+          numValsInVector,
+          actualBatchSize,
+          nullabilityHolder,
+          plainValuesReader);
+    }
+    triplesRead += actualBatchSize;
+    this.hasNext = triplesRead < triplesCount;
+    return actualBatchSize;
+  }
+
+  /**
+   * Method for reading batches of fixed width binary type (e.g. BYTE[7]). 
Spark does not support fixed width binary
+   * data type. To work around this limitation, the data is read as fixed 
width binary from parquet and stored in a
+   * {@link VarBinaryVector} in Arrow.
+   */
+  public int nextBatchFixedWidthBinary(
+      final FieldVector vector, final int expectedBatchSize, final int 
numValsInVector,
+      final int typeWidth, NullabilityHolder nullabilityHolder) {
+    final int actualBatchSize = getActualBatchSize(expectedBatchSize);
+    if (actualBatchSize <= 0) {
+      return 0;
+    }
+    if (eagerDecodeDictionary) {
+      
vectorizedDefinitionLevelReader.readBatchOfDictionaryEncodedFixedWidthBinary(
+          vector,
+          numValsInVector,
+          typeWidth,
+          actualBatchSize,
+          nullabilityHolder,
+          dictionaryEncodedValuesReader,
+          dictionary);
+    } else {
+      vectorizedDefinitionLevelReader.readBatchOfFixedWidthBinary(
+          vector,
+          numValsInVector,
+          typeWidth,
+          actualBatchSize,
+          nullabilityHolder,
+          plainValuesReader);
+    }
+    triplesRead += actualBatchSize;
+    this.hasNext = triplesRead < triplesCount;
+    return actualBatchSize;
+  }
+
+  /**
+   * Method for reading batches of booleans.
+   */
+  public int nextBatchBoolean(
+      final FieldVector vector,
+      final int expectedBatchSize,
+      final int numValsInVector,
+      NullabilityHolder nullabilityHolder) {
+    final int actualBatchSize = getActualBatchSize(expectedBatchSize);
+    if (actualBatchSize <= 0) {
+      return 0;
+    }
+    vectorizedDefinitionLevelReader
+        .readBatchOfBooleans(vector, numValsInVector, actualBatchSize,
+            nullabilityHolder, plainValuesReader);
+    triplesRead += actualBatchSize;
+    this.hasNext = triplesRead < triplesCount;
+    return actualBatchSize;
+  }
+
+  @Override
+  protected void initDataReader(Encoding dataEncoding, ByteBufferInputStream 
in, int valueCount) {
+    ValuesReader previousReader = plainValuesReader;
+    this.eagerDecodeDictionary = dataEncoding.usesDictionary() && dictionary 
!= null &&
+        (ParquetUtil.isIntType(desc.getPrimitiveType()) || 
!allPagesDictEncoded);
 
 Review comment:
   @samarthjain, in a follow-up, let's update the logic here to something other 
than just eagerly decoding integers. We probably want to do this for 4-byte 
floats, for example.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@iceberg.apache.org
For additional commands, e-mail: issues-h...@iceberg.apache.org

Reply via email to