This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 40295605aae6 feat(flink): Backport Flink 2.1 nested Parquet column
readers and INT64 timestamp dispatch (FLINK-35702) (#18636)
40295605aae6 is described below
commit 40295605aae691da07e93a1a2919d38b41cbb6dd
Author: Shihuan Liu <[email protected]>
AuthorDate: Wed May 6 23:23:48 2026 -0700
feat(flink): Backport Flink 2.1 nested Parquet column readers and INT64
timestamp dispatch (FLINK-35702) (#18636)
* feat(flink): Backport Flink 2.1 nested Parquet column readers and INT64
timestamp dispatch (FLINK-35702)
* Minor fixes
---
.../table/format/cow/vector/HeapArrayVector.java | 31 +
.../format/cow/vector/HeapMapColumnVector.java | 63 +-
.../format/cow/vector/HeapRowColumnVector.java | 15 +
.../cow/vector/reader/NestedColumnReader.java | 257 +++++++++
.../vector/reader/NestedPrimitiveColumnReader.java | 639 +++++++++++++++++++++
.../reader/ParquetDataColumnReaderFactory.java | 108 +++-
.../cow/vector/TestHeapColumnVectorAccessors.java | 138 +++++
.../reader/TestParquetDataColumnReaderFactory.java | 272 +++++++++
8 files changed, 1516 insertions(+), 7 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapArrayVector.java
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapArrayVector.java
index a0dced01e5e8..2f21a323302f 100644
---
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapArrayVector.java
+++
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapArrayVector.java
@@ -57,6 +57,37 @@ public class HeapArrayVector extends AbstractHeapVector
return this.isNull.length;
}
+ //
---------------------------------------------------------------------------------------------
+ // Flink 2.1-compatible accessors. Backed by the existing public {@code
offsets}, {@code lengths}
+ // and {@code child} fields so legacy callers continue to work; the new
{@link
+ // org.apache.hudi.table.format.cow.vector.reader.NestedColumnReader}
(FLINK-35702 port) and any
+ // future Flink-2.1-style caller use these accessors.
+ //
---------------------------------------------------------------------------------------------
+
+ public long[] getOffsets() {
+ return offsets;
+ }
+
+ public void setOffsets(long[] offsets) {
+ this.offsets = offsets;
+ }
+
+ public long[] getLengths() {
+ return lengths;
+ }
+
+ public void setLengths(long[] lengths) {
+ this.lengths = lengths;
+ }
+
+ public ColumnVector getChild() {
+ return child;
+ }
+
+ public void setChild(ColumnVector child) {
+ this.child = child;
+ }
+
@Override
public ArrayData getArray(int i) {
long offset = offsets[i];
diff --git
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java
index 0d83f82baedf..a98bdebd707a 100644
---
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java
+++
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table.format.cow.vector;
import lombok.Getter;
import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.columnar.vector.ColumnVector;
import org.apache.flink.table.data.columnar.vector.MapColumnVector;
import org.apache.flink.table.data.columnar.vector.heap.AbstractHeapVector;
import
org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector;
@@ -31,16 +32,74 @@ public class HeapMapColumnVector extends AbstractHeapVector
implements WritableColumnVector, MapColumnVector {
@Getter
- private final WritableColumnVector keys;
+ private WritableColumnVector keys;
@Getter
- private final WritableColumnVector values;
+ private WritableColumnVector values;
+
+ //
---------------------------------------------------------------------------------------------
+ // Flink 2.1 Dremel-style state. Populated by {@link
+ // org.apache.hudi.table.format.cow.vector.reader.NestedColumnReader}
(FLINK-35702 port). The
+ // legacy {@link #getMap(int)} implementation below continues to use {@code
ColumnarGroupMapData}
+ // — wiring it through these offsets/lengths happens in a follow-up PR that
switches the read
+ // path. Left here so the new readers can compile against the additive
surface.
+ //
---------------------------------------------------------------------------------------------
+ private long[] offsets;
+ private long[] lengths;
+ private int size;
public HeapMapColumnVector(int len, WritableColumnVector keys,
WritableColumnVector values) {
super(len);
+ this.offsets = new long[len];
+ this.lengths = new long[len];
this.keys = keys;
this.values = values;
}
+ public long[] getOffsets() {
+ return offsets;
+ }
+
+ public void setOffsets(long[] offsets) {
+ this.offsets = offsets;
+ }
+
+ public long[] getLengths() {
+ return lengths;
+ }
+
+ public void setLengths(long[] lengths) {
+ this.lengths = lengths;
+ }
+
+ public int getSize() {
+ return size;
+ }
+
+ public void setSize(int size) {
+ this.size = size;
+ }
+
+ public void setKeys(WritableColumnVector keys) {
+ this.keys = keys;
+ }
+
+ public void setValues(WritableColumnVector values) {
+ this.values = values;
+ }
+
+ /**
+ * Returns the keys child vector typed as {@link ColumnVector}, matching the
Flink 2.1 contract
+ * consumed by {@code NestedColumnReader}. Functionally equivalent to {@link
#getKeys()}.
+ */
+ public ColumnVector getKeyColumnVector() {
+ return keys;
+ }
+
+ /** Counterpart of {@link #getKeyColumnVector()} for the values child
vector. */
+ public ColumnVector getValueColumnVector() {
+ return values;
+ }
+
@Override
public MapData getMap(int rowId) {
return new ColumnarGroupMapData(keys, values, rowId);
diff --git
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
index ae194e4e6ab0..0c640ce92ee4 100644
---
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
+++
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
@@ -37,6 +37,21 @@ public class HeapRowColumnVector extends AbstractHeapVector
this.vectors = vectors;
}
+ /**
+ * Flink 2.1-compatible accessor for the children vectors. Backed by the
existing public {@code
+ * vectors} field so legacy callers continue to work; the new {@link
+ * org.apache.hudi.table.format.cow.vector.reader.NestedColumnReader}
(FLINK-35702 port) and any
+ * future Flink-2.1-style caller use this accessor.
+ */
+ public WritableColumnVector[] getFields() {
+ return vectors;
+ }
+
+ /** Counterpart of {@link #getFields()}. */
+ public void setFields(WritableColumnVector[] fields) {
+ this.vectors = fields;
+ }
+
@Override
public ColumnarRowData getRow(int i) {
ColumnarRowData columnarRowData = new ColumnarRowData(new
VectorizedColumnBatch(vectors));
diff --git
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/NestedColumnReader.java
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/NestedColumnReader.java
new file mode 100644
index 000000000000..27eab298b320
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/NestedColumnReader.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector.reader;
+
+import org.apache.hudi.table.format.cow.utils.NestedPositionUtil;
+import org.apache.hudi.table.format.cow.vector.HeapArrayVector;
+import org.apache.hudi.table.format.cow.vector.HeapMapColumnVector;
+import org.apache.hudi.table.format.cow.vector.HeapRowColumnVector;
+import org.apache.hudi.table.format.cow.vector.position.CollectionPosition;
+import org.apache.hudi.table.format.cow.vector.position.LevelDelegation;
+import org.apache.hudi.table.format.cow.vector.position.RowPosition;
+import org.apache.hudi.table.format.cow.vector.type.ParquetField;
+import org.apache.hudi.table.format.cow.vector.type.ParquetGroupField;
+import org.apache.hudi.table.format.cow.vector.type.ParquetPrimitiveField;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
+import org.apache.flink.table.data.columnar.vector.ColumnVector;
+import org.apache.flink.table.data.columnar.vector.heap.AbstractHeapVector;
+import
org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.page.PageReadStore;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * ColumnReader used to read a {@code Group} type in Parquet ({@code Map},
{@code Array}, {@code
+ * Row}). Resolves nested structures using Dremel striping/assembly; see <a
+ *
href="https://github.com/julienledem/redelm/wiki/The-striping-and-assembly-algorithms-from-the-Dremel-paper">the
+ * striping and assembly algorithms from the Dremel paper</a>.
+ *
+ * <p>Vendored from Apache Flink 2.1 (FLINK-35702, {@code
+ * org.apache.flink.formats.parquet.vector.reader.NestedColumnReader}).
Differences vs. upstream:
+ *
+ * <ul>
+ * <li>Uses Hudi-local {@code HeapRowColumnVector}/{@code
HeapMapColumnVector}/{@code
+ * HeapArrayVector} instead of the Flink-private {@code
HeapRowVector}/{@code
+ * HeapMapVector}/{@code HeapArrayVector}.
+ * <li>Supports Hudi's schema-evolution contract: a {@code
ParquetGroupField} representing a
+ * {@link RowType} may contain {@code null} children — meaning the
corresponding logical
+ * field is absent from the Parquet file. Those slots are passed through
unchanged and do
+ * not contribute to the row's repetition/definition-level stream.
+ * </ul>
+ */
+public class NestedColumnReader implements ColumnReader<WritableColumnVector> {
+
+ private final Map<ColumnDescriptor, NestedPrimitiveColumnReader>
columnReaders;
+ private final boolean isUtcTimestamp;
+
+ private final PageReadStore pages;
+
+ private final ParquetField field;
+
+ public NestedColumnReader(boolean isUtcTimestamp, PageReadStore pages,
ParquetField field) {
+ this.isUtcTimestamp = isUtcTimestamp;
+ this.pages = pages;
+ this.field = field;
+ this.columnReaders = new HashMap<>();
+ }
+
+ @Override
+ public void readToVector(int readNumber, WritableColumnVector vector) throws
IOException {
+ readData(field, readNumber, vector, false);
+ }
+
+ private Tuple2<LevelDelegation, WritableColumnVector> readData(
+ ParquetField field, int readNumber, ColumnVector vector, boolean inside)
throws IOException {
+ if (field.getType() instanceof RowType) {
+ return readRow((ParquetGroupField) field, readNumber, vector, inside);
+ } else if (field.getType() instanceof MapType || field.getType()
instanceof MultisetType) {
+ return readMap((ParquetGroupField) field, readNumber, vector, inside);
+ } else if (field.getType() instanceof ArrayType) {
+ return readArray((ParquetGroupField) field, readNumber, vector, inside);
+ } else {
+ return readPrimitive((ParquetPrimitiveField) field, readNumber, vector);
+ }
+ }
+
+ private Tuple2<LevelDelegation, WritableColumnVector> readRow(
+ ParquetGroupField field, int readNumber, ColumnVector vector, boolean
inside)
+ throws IOException {
+ HeapRowColumnVector heapRowVector = (HeapRowColumnVector) vector;
+ LevelDelegation levelDelegation = null;
+ List<ParquetField> children = field.getChildren();
+ WritableColumnVector[] childrenVectors = heapRowVector.getFields();
+ WritableColumnVector[] finalChildrenVectors = new
WritableColumnVector[childrenVectors.length];
+ for (int i = 0; i < children.size(); i++) {
+ ParquetField child = children.get(i);
+ if (child == null) {
+ // Hudi schema-evolution: the logical field is not present in the
Parquet file. The slot
+ // vector is expected to be pre-populated with nulls by the caller
(lands in a follow-up
+ // PR that rewires ParquetSplitReaderUtil); keep it as is and skip
contributing to the
+ // level stream.
+ finalChildrenVectors[i] = childrenVectors[i];
+ continue;
+ }
+ Tuple2<LevelDelegation, WritableColumnVector> tuple =
+ readData(child, readNumber, childrenVectors[i], true);
+ levelDelegation = tuple.f0;
+ finalChildrenVectors[i] = tuple.f1;
+ }
+ if (levelDelegation == null) {
+ throw new FlinkRuntimeException(
+ String.format("Row field does not have any non-null children: %s.",
field));
+ }
+
+ RowPosition rowPosition =
+ NestedPositionUtil.calculateRowOffsets(
+ field,
+ levelDelegation.getDefinitionLevel(),
+ levelDelegation.getRepetitionLevel());
+
+ // If row was inside the structure, then we need to renew the vector to
reset the
+ // capacity.
+ if (inside) {
+ heapRowVector = new HeapRowColumnVector(rowPosition.getPositionsCount(),
finalChildrenVectors);
+ } else {
+ heapRowVector.setFields(finalChildrenVectors);
+ }
+
+ if (rowPosition.getIsNull() != null) {
+ setFieldNullFlag(rowPosition.getIsNull(), heapRowVector);
+ }
+ return Tuple2.of(levelDelegation, heapRowVector);
+ }
+
+ private Tuple2<LevelDelegation, WritableColumnVector> readMap(
+ ParquetGroupField field, int readNumber, ColumnVector vector, boolean
inside)
+ throws IOException {
+ HeapMapColumnVector mapVector = (HeapMapColumnVector) vector;
+ mapVector.reset();
+ List<ParquetField> children = field.getChildren();
+ Preconditions.checkArgument(
+ children.size() == 2,
+ "Maps must have two type parameters, found %s",
+ children.size());
+ Tuple2<LevelDelegation, WritableColumnVector> keyTuple =
+ readData(children.get(0), readNumber, mapVector.getKeyColumnVector(),
true);
+ Tuple2<LevelDelegation, WritableColumnVector> valueTuple =
+ readData(children.get(1), readNumber,
mapVector.getValueColumnVector(), true);
+
+ LevelDelegation levelDelegation = keyTuple.f0;
+
+ CollectionPosition collectionPosition =
+ NestedPositionUtil.calculateCollectionOffsets(
+ field,
+ levelDelegation.getDefinitionLevel(),
+ levelDelegation.getRepetitionLevel());
+
+ // If map was inside the structure, then we need to renew the vector to
reset the
+ // capacity.
+ if (inside) {
+ mapVector = new HeapMapColumnVector(collectionPosition.getValueCount(),
keyTuple.f1, valueTuple.f1);
+ } else {
+ mapVector.setKeys(keyTuple.f1);
+ mapVector.setValues(valueTuple.f1);
+ }
+
+ if (collectionPosition.getIsNull() != null) {
+ setFieldNullFlag(collectionPosition.getIsNull(), mapVector);
+ }
+
+ mapVector.setLengths(collectionPosition.getLength());
+ mapVector.setOffsets(collectionPosition.getOffsets());
+
+ return Tuple2.of(levelDelegation, mapVector);
+ }
+
+ private Tuple2<LevelDelegation, WritableColumnVector> readArray(
+ ParquetGroupField field, int readNumber, ColumnVector vector, boolean
inside)
+ throws IOException {
+ HeapArrayVector arrayVector = (HeapArrayVector) vector;
+ arrayVector.reset();
+ List<ParquetField> children = field.getChildren();
+ Preconditions.checkArgument(
+ children.size() == 1,
+ "Arrays must have a single type parameter, found %s",
+ children.size());
+ Tuple2<LevelDelegation, WritableColumnVector> tuple =
+ readData(children.get(0), readNumber, arrayVector.getChild(), true);
+
+ LevelDelegation levelDelegation = tuple.f0;
+ CollectionPosition collectionPosition =
+ NestedPositionUtil.calculateCollectionOffsets(
+ field,
+ levelDelegation.getDefinitionLevel(),
+ levelDelegation.getRepetitionLevel());
+
+ // If array was inside the structure, then we need to renew the vector to
reset the
+ // capacity.
+ if (inside) {
+ arrayVector = new HeapArrayVector(collectionPosition.getValueCount(),
tuple.f1);
+ } else {
+ arrayVector.setChild(tuple.f1);
+ }
+
+ if (collectionPosition.getIsNull() != null) {
+ setFieldNullFlag(collectionPosition.getIsNull(), arrayVector);
+ }
+ arrayVector.setLengths(collectionPosition.getLength());
+ arrayVector.setOffsets(collectionPosition.getOffsets());
+ return Tuple2.of(levelDelegation, arrayVector);
+ }
+
+ private Tuple2<LevelDelegation, WritableColumnVector> readPrimitive(
+ ParquetPrimitiveField field, int readNumber, ColumnVector vector) throws
IOException {
+ ColumnDescriptor descriptor = field.getDescriptor();
+ NestedPrimitiveColumnReader reader = columnReaders.get(descriptor);
+ if (reader == null) {
+ reader =
+ new NestedPrimitiveColumnReader(
+ descriptor,
+ pages.getPageReader(descriptor),
+ isUtcTimestamp,
+ descriptor.getPrimitiveType(),
+ field.getType());
+ columnReaders.put(descriptor, reader);
+ }
+ WritableColumnVector writableColumnVector =
+ reader.readAndNewVector(readNumber, (WritableColumnVector) vector);
+ return Tuple2.of(reader.getLevelDelegation(), writableColumnVector);
+ }
+
+ private static void setFieldNullFlag(boolean[] nullFlags, AbstractHeapVector
vector) {
+ for (int index = 0; index < vector.getLen() && index < nullFlags.length;
index++) {
+ if (nullFlags[index]) {
+ vector.setNullAt(index);
+ }
+ }
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/NestedPrimitiveColumnReader.java
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/NestedPrimitiveColumnReader.java
new file mode 100644
index 000000000000..3f0aefe2af74
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/NestedPrimitiveColumnReader.java
@@ -0,0 +1,639 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector.reader;
+
+import org.apache.hudi.table.format.cow.utils.IntArrayList;
+import org.apache.hudi.table.format.cow.vector.ParquetDecimalVector;
+import org.apache.hudi.table.format.cow.vector.position.LevelDelegation;
+
+import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.columnar.vector.heap.HeapBooleanVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapByteVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapBytesVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapDoubleVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapFloatVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapIntVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapLongVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapShortVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapTimestampVector;
+import
org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+
+/**
+ * Reader to read a single primitive leaf column that participates in a nested
(Dremel) structure.
+ *
+ * <p>Vendored from Apache Flink 2.1 (FLINK-35702, {@code
+ *
org.apache.flink.formats.parquet.vector.reader.NestedPrimitiveColumnReader}).
Only the package
+ * and the Hudi-local {@link ParquetDecimalVector} / {@link LevelDelegation} /
{@link IntArrayList}
+ * imports are changed; the algorithm is untouched. The companion
Hudi-specific {@code
+ * Int64TimestampColumnReader} / {@code FixedLenBytesColumnReader} behaviours
stay at the leaf-
+ * reader creation boundary in {@code ParquetSplitReaderUtil} (lands in a
follow-up PR), not inside
+ * this class — keeping it a faithful copy of upstream.
+ */
+public class NestedPrimitiveColumnReader implements
ColumnReader<WritableColumnVector> {
+ private static final Logger LOG =
LoggerFactory.getLogger(NestedPrimitiveColumnReader.class);
+
+ private final IntArrayList repetitionLevelList = new IntArrayList(0);
+ private final IntArrayList definitionLevelList = new IntArrayList(0);
+
+ private final PageReader pageReader;
+ private final ColumnDescriptor descriptor;
+ private final Type type;
+ private final LogicalType logicalType;
+
+ /** The dictionary, if this column has dictionary encoding. */
+ private final ParquetDataColumnReader dictionary;
+
+ /** Maximum definition level for this column. */
+ private final int maxDefLevel;
+
+ private boolean isUtcTimestamp;
+
+ /** Total number of values read. */
+ private long valuesRead;
+
+ /**
+ * value that indicates the end of the current page. That is, if valuesRead
==
+ * endOfPageValueCount, we are at the end of the page.
+ */
+ private long endOfPageValueCount;
+
+ /** If true, the current page is dictionary encoded. */
+ private boolean isCurrentPageDictionaryEncoded;
+
+ private int definitionLevel;
+ private int repetitionLevel;
+
+ /** Repetition/Definition/Value readers. */
+ private IntIterator repetitionLevelColumn;
+
+ private IntIterator definitionLevelColumn;
+ private ParquetDataColumnReader dataColumn;
+
+ /** Total values in the current page. */
+ private int pageValueCount;
+
+ // flag to indicate if there is no data in parquet data page
+ private boolean eof = false;
+
+ private boolean isFirstRow = true;
+
+ private Object lastValue;
+
+ public NestedPrimitiveColumnReader(
+ ColumnDescriptor descriptor,
+ PageReader pageReader,
+ boolean isUtcTimestamp,
+ Type parquetType,
+ LogicalType logicalType)
+ throws IOException {
+ this.descriptor = descriptor;
+ this.type = parquetType;
+ this.pageReader = pageReader;
+ this.maxDefLevel = descriptor.getMaxDefinitionLevel();
+ this.isUtcTimestamp = isUtcTimestamp;
+ this.logicalType = logicalType;
+
+ DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
+ if (dictionaryPage != null) {
+ try {
+ this.dictionary =
+
ParquetDataColumnReaderFactory.getDataColumnReaderByTypeOnDictionary(
+ parquetType.asPrimitiveType(),
+ dictionaryPage.getEncoding().initDictionary(descriptor,
dictionaryPage),
+ isUtcTimestamp);
+ this.isCurrentPageDictionaryEncoded = true;
+ } catch (IOException e) {
+ throw new IOException(
+ String.format("Could not decode the dictionary for %s",
descriptor), e);
+ }
+ } else {
+ this.dictionary = null;
+ this.isCurrentPageDictionaryEncoded = false;
+ }
+ }
+
+ // Not invoked directly; callers use readAndNewVector instead.
+ @Override
+ public void readToVector(int readNumber, WritableColumnVector vector) throws
IOException {
+ throw new UnsupportedOperationException("This function should not be
called.");
+ }
+
+ public WritableColumnVector readAndNewVector(int readNumber,
WritableColumnVector vector)
+ throws IOException {
+ if (isFirstRow) {
+ if (!readValue()) {
+ return vector;
+ }
+ isFirstRow = false;
+ }
+
+ // index to set value.
+ int index = 0;
+ int valueIndex = 0;
+ List<Object> valueList = new ArrayList<>();
+
+ // repeated type need two loops to read data.
+ while (!eof && index < readNumber) {
+ do {
+ valueList.add(lastValue);
+ valueIndex++;
+ } while (readValue() && (repetitionLevel != 0));
+ index++;
+ }
+
+ return fillColumnVector(valueIndex, valueList);
+ }
+
+ public LevelDelegation getLevelDelegation() {
+ int[] repetition = repetitionLevelList.toArray();
+ int[] definition = definitionLevelList.toArray();
+ repetitionLevelList.clear();
+ definitionLevelList.clear();
+ repetitionLevelList.add(repetitionLevel);
+ definitionLevelList.add(definitionLevel);
+ return new LevelDelegation(repetition, definition);
+ }
+
+ private boolean readValue() throws IOException {
+ int left = readPageIfNeed();
+ if (left > 0) {
+ // get the values of repetition and definitionLevel
+ readAndSaveRepetitionAndDefinitionLevels();
+ // read the data if it isn't null
+ if (definitionLevel == maxDefLevel) {
+ if (isCurrentPageDictionaryEncoded) {
+ int dictionaryId = dataColumn.readValueDictionaryId();
+ lastValue = dictionaryDecodeValue(logicalType, dictionaryId);
+ } else {
+ lastValue = readPrimitiveTypedRow(logicalType);
+ }
+ } else {
+ lastValue = null;
+ }
+ return true;
+ } else {
+ eof = true;
+ return false;
+ }
+ }
+
+ private void readAndSaveRepetitionAndDefinitionLevels() {
+ // get the values of repetition and definitionLevel
+ repetitionLevel = repetitionLevelColumn.nextInt();
+ definitionLevel = definitionLevelColumn.nextInt();
+ valuesRead++;
+ repetitionLevelList.add(repetitionLevel);
+ definitionLevelList.add(definitionLevel);
+ }
+
+ private int readPageIfNeed() throws IOException {
+ // Compute the number of values we want to read in this page.
+ int leftInPage = (int) (endOfPageValueCount - valuesRead);
+ if (leftInPage == 0) {
+ // no data left in current page, load data from new page
+ readPage();
+ leftInPage = (int) (endOfPageValueCount - valuesRead);
+ }
+ return leftInPage;
+ }
+
+ private Object readPrimitiveTypedRow(LogicalType category) {
+ switch (category.getTypeRoot()) {
+ case CHAR:
+ case VARCHAR:
+ case BINARY:
+ case VARBINARY:
+ return dataColumn.readBytes();
+ case BOOLEAN:
+ return dataColumn.readBoolean();
+ case TIME_WITHOUT_TIME_ZONE:
+ case DATE:
+ case INTEGER:
+ return dataColumn.readInteger();
+ case TINYINT:
+ return dataColumn.readTinyInt();
+ case SMALLINT:
+ return dataColumn.readSmallInt();
+ case BIGINT:
+ return dataColumn.readLong();
+ case FLOAT:
+ return dataColumn.readFloat();
+ case DOUBLE:
+ return dataColumn.readDouble();
+ case DECIMAL:
+ switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
+ case INT32:
+ return dataColumn.readInteger();
+ case INT64:
+ return dataColumn.readLong();
+ case BINARY:
+ case FIXED_LEN_BYTE_ARRAY:
+ return dataColumn.readBytes();
+ default:
+ throw new RuntimeException(
+ "Unsupported physical type for DECIMAL: " +
descriptor.getPrimitiveType());
+ }
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ return dataColumn.readTimestamp();
+ default:
+ throw new RuntimeException("Unsupported type in the list: " + type);
+ }
+ }
+
+ private Object dictionaryDecodeValue(LogicalType category, Integer
dictionaryValue) {
+ if (dictionaryValue == null) {
+ return null;
+ }
+
+ switch (category.getTypeRoot()) {
+ case CHAR:
+ case VARCHAR:
+ case BINARY:
+ case VARBINARY:
+ return dictionary.readBytes(dictionaryValue);
+ case DATE:
+ case TIME_WITHOUT_TIME_ZONE:
+ case INTEGER:
+ return dictionary.readInteger(dictionaryValue);
+ case BOOLEAN:
+ return dictionary.readBoolean(dictionaryValue) ? 1 : 0;
+ case DOUBLE:
+ return dictionary.readDouble(dictionaryValue);
+ case FLOAT:
+ return dictionary.readFloat(dictionaryValue);
+ case TINYINT:
+ return dictionary.readTinyInt(dictionaryValue);
+ case SMALLINT:
+ return dictionary.readSmallInt(dictionaryValue);
+ case BIGINT:
+ return dictionary.readLong(dictionaryValue);
+ case DECIMAL:
+ switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
+ case INT32:
+ return dictionary.readInteger(dictionaryValue);
+ case INT64:
+ return dictionary.readLong(dictionaryValue);
+ case FIXED_LEN_BYTE_ARRAY:
+ case BINARY:
+ return dictionary.readBytes(dictionaryValue);
+ default:
+ throw new RuntimeException(
+ "Unsupported physical type for DECIMAL: " +
descriptor.getPrimitiveType());
+ }
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ return dictionary.readTimestamp(dictionaryValue);
+ default:
+ throw new RuntimeException("Unsupported type in the list: " + type);
+ }
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ private WritableColumnVector fillColumnVector(int total, List valueList) {
+ switch (logicalType.getTypeRoot()) {
+ case CHAR:
+ case VARCHAR:
+ case BINARY:
+ case VARBINARY:
+ HeapBytesVector heapBytesVector = new HeapBytesVector(total);
+ for (int i = 0; i < valueList.size(); i++) {
+ byte[] src = ((List<byte[]>) valueList).get(i);
+ if (src == null) {
+ heapBytesVector.setNullAt(i);
+ } else {
+ heapBytesVector.appendBytes(i, src, 0, src.length);
+ }
+ }
+ return heapBytesVector;
+ case BOOLEAN:
+ HeapBooleanVector heapBooleanVector = new HeapBooleanVector(total);
+ for (int i = 0; i < valueList.size(); i++) {
+ if (valueList.get(i) == null) {
+ heapBooleanVector.setNullAt(i);
+ } else {
+ heapBooleanVector.vector[i] = ((List<Boolean>) valueList).get(i);
+ }
+ }
+ return heapBooleanVector;
+ case TINYINT:
+ HeapByteVector heapByteVector = new HeapByteVector(total);
+ for (int i = 0; i < valueList.size(); i++) {
+ if (valueList.get(i) == null) {
+ heapByteVector.setNullAt(i);
+ } else {
+ heapByteVector.vector[i] = (byte) ((List<Integer>)
valueList).get(i).intValue();
+ }
+ }
+ return heapByteVector;
+ case SMALLINT:
+ HeapShortVector heapShortVector = new HeapShortVector(total);
+ for (int i = 0; i < valueList.size(); i++) {
+ if (valueList.get(i) == null) {
+ heapShortVector.setNullAt(i);
+ } else {
+ heapShortVector.vector[i] = (short) ((List<Integer>)
valueList).get(i).intValue();
+ }
+ }
+ return heapShortVector;
+ case INTEGER:
+ case DATE:
+ case TIME_WITHOUT_TIME_ZONE:
+ HeapIntVector heapIntVector = new HeapIntVector(total);
+ for (int i = 0; i < valueList.size(); i++) {
+ if (valueList.get(i) == null) {
+ heapIntVector.setNullAt(i);
+ } else {
+ heapIntVector.vector[i] = ((List<Integer>) valueList).get(i);
+ }
+ }
+ return heapIntVector;
+ case FLOAT:
+ HeapFloatVector heapFloatVector = new HeapFloatVector(total);
+ for (int i = 0; i < valueList.size(); i++) {
+ if (valueList.get(i) == null) {
+ heapFloatVector.setNullAt(i);
+ } else {
+ heapFloatVector.vector[i] = ((List<Float>) valueList).get(i);
+ }
+ }
+ return heapFloatVector;
+ case BIGINT:
+ HeapLongVector heapLongVector = new HeapLongVector(total);
+ for (int i = 0; i < valueList.size(); i++) {
+ if (valueList.get(i) == null) {
+ heapLongVector.setNullAt(i);
+ } else {
+ heapLongVector.vector[i] = ((List<Long>) valueList).get(i);
+ }
+ }
+ return heapLongVector;
+ case DOUBLE:
+ HeapDoubleVector heapDoubleVector = new HeapDoubleVector(total);
+ for (int i = 0; i < valueList.size(); i++) {
+ if (valueList.get(i) == null) {
+ heapDoubleVector.setNullAt(i);
+ } else {
+ heapDoubleVector.vector[i] = ((List<Double>) valueList).get(i);
+ }
+ }
+ return heapDoubleVector;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ HeapTimestampVector heapTimestampVector = new
HeapTimestampVector(total);
+ for (int i = 0; i < valueList.size(); i++) {
+ if (valueList.get(i) == null) {
+ heapTimestampVector.setNullAt(i);
+ } else {
+ heapTimestampVector.setTimestamp(i, ((List<TimestampData>)
valueList).get(i));
+ }
+ }
+ return heapTimestampVector;
+ case DECIMAL:
+ PrimitiveType.PrimitiveTypeName primitiveTypeName =
+ descriptor.getPrimitiveType().getPrimitiveTypeName();
+ switch (primitiveTypeName) {
+ case INT32:
+ HeapIntVector phiv = new HeapIntVector(total);
+ for (int i = 0; i < valueList.size(); i++) {
+ if (valueList.get(i) == null) {
+ phiv.setNullAt(i);
+ } else {
+ phiv.vector[i] = ((List<Integer>) valueList).get(i);
+ }
+ }
+ return new ParquetDecimalVector(phiv);
+ case INT64:
+ HeapLongVector phlv = new HeapLongVector(total);
+ for (int i = 0; i < valueList.size(); i++) {
+ if (valueList.get(i) == null) {
+ phlv.setNullAt(i);
+ } else {
+ phlv.vector[i] = ((List<Long>) valueList).get(i);
+ }
+ }
+ return new ParquetDecimalVector(phlv);
+ default:
+ HeapBytesVector phbv = getHeapBytesVector(total, valueList);
+ return new ParquetDecimalVector(phbv);
+ }
+ default:
+ throw new RuntimeException("Unsupported type in the list: " + type);
+ }
+ }
+
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ private static HeapBytesVector getHeapBytesVector(int total, List valueList)
{
+ HeapBytesVector phbv = new HeapBytesVector(total);
+ for (int i = 0; i < valueList.size(); i++) {
+ byte[] src = ((List<byte[]>) valueList).get(i);
+ if (valueList.get(i) == null) {
+ phbv.setNullAt(i);
+ } else {
+ phbv.appendBytes(i, src, 0, src.length);
+ }
+ }
+ return phbv;
+ }
+
+ protected void readPage() {
+ DataPage page = pageReader.readPage();
+
+ if (page == null) {
+ return;
+ }
+
+ page.accept(
+ new DataPage.Visitor<Void>() {
+ @Override
+ public Void visit(DataPageV1 dataPageV1) {
+ readPageV1(dataPageV1);
+ return null;
+ }
+
+ @Override
+ public Void visit(DataPageV2 dataPageV2) {
+ readPageV2(dataPageV2);
+ return null;
+ }
+ });
+ }
+
+ private void initDataReader(Encoding dataEncoding, ByteBufferInputStream in,
int valueCount)
+ throws IOException {
+ this.pageValueCount = valueCount;
+ this.endOfPageValueCount = valuesRead + pageValueCount;
+ if (dataEncoding.usesDictionary()) {
+ this.dataColumn = null;
+ if (dictionary == null) {
+ throw new IOException(
+ String.format(
+ "Could not read page in col %s because the dictionary was
missing for encoding %s.",
+ descriptor, dataEncoding));
+ }
+ dataColumn =
+ ParquetDataColumnReaderFactory.getDataColumnReaderByType(
+ type.asPrimitiveType(),
+ dataEncoding.getDictionaryBasedValuesReader(
+ descriptor, VALUES, dictionary.getDictionary()),
+ isUtcTimestamp);
+ this.isCurrentPageDictionaryEncoded = true;
+ } else {
+ dataColumn =
+ ParquetDataColumnReaderFactory.getDataColumnReaderByType(
+ type.asPrimitiveType(),
+ dataEncoding.getValuesReader(descriptor, VALUES),
+ isUtcTimestamp);
+ this.isCurrentPageDictionaryEncoded = false;
+ }
+
+ try {
+ dataColumn.initFromPage(pageValueCount, in);
+ } catch (IOException e) {
+ throw new IOException(String.format("Could not read page in col %s.",
descriptor), e);
+ }
+ }
+
+ private void readPageV1(DataPageV1 page) {
+ ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor,
REPETITION_LEVEL);
+ ValuesReader dlReader = page.getDlEncoding().getValuesReader(descriptor,
DEFINITION_LEVEL);
+ this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
+ this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
+ try {
+ BytesInput bytes = page.getBytes();
+ LOG.debug("Page size {} bytes and {} records.", bytes.size(),
pageValueCount);
+ ByteBufferInputStream in = bytes.toInputStream();
+ LOG.debug("Reading repetition levels at {}.", in.position());
+ rlReader.initFromPage(pageValueCount, in);
+ LOG.debug("Reading definition levels at {}.", in.position());
+ dlReader.initFromPage(pageValueCount, in);
+ LOG.debug("Reading data at {}.", in.position());
+ initDataReader(page.getValueEncoding(), in, page.getValueCount());
+ } catch (IOException e) {
+ throw new ParquetDecodingException(
+ String.format("Could not read page %s in col %s.", page,
descriptor), e);
+ }
+ }
+
+ private void readPageV2(DataPageV2 page) {
+ this.pageValueCount = page.getValueCount();
+ this.repetitionLevelColumn =
+ newRLEIterator(descriptor.getMaxRepetitionLevel(),
page.getRepetitionLevels());
+ this.definitionLevelColumn =
+ newRLEIterator(descriptor.getMaxDefinitionLevel(),
page.getDefinitionLevels());
+ try {
+ LOG.debug(
+ "Page data size {} bytes and {} records.", page.getData().size(),
pageValueCount);
+ initDataReader(
+ page.getDataEncoding(), page.getData().toInputStream(),
page.getValueCount());
+ } catch (IOException e) {
+ throw new ParquetDecodingException(
+ String.format("Could not read page %s in col %s.", page,
descriptor), e);
+ }
+ }
+
+ private IntIterator newRLEIterator(int maxLevel, BytesInput bytes) {
+ try {
+ if (maxLevel == 0) {
+ return new NullIntIterator();
+ }
+ return new RLEIntIterator(
+ new RunLengthBitPackingHybridDecoder(
+ BytesUtils.getWidthFromMaxInt(maxLevel),
+ new ByteArrayInputStream(bytes.toByteArray())));
+ } catch (IOException e) {
+ throw new ParquetDecodingException(
+ String.format("Could not read levels in page for col %s.",
descriptor), e);
+ }
+ }
+
+ /** Utility interface to abstract over different way to read ints with
different encodings. */
+ interface IntIterator {
+ int nextInt();
+ }
+
+ /** Reading int from {@link ValuesReader}. */
+ protected static final class ValuesReaderIntIterator implements IntIterator {
+ ValuesReader delegate;
+
+ public ValuesReaderIntIterator(ValuesReader delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public int nextInt() {
+ return delegate.readInteger();
+ }
+ }
+
+ /** Reading int from {@link RunLengthBitPackingHybridDecoder}. */
+ protected static final class RLEIntIterator implements IntIterator {
+ RunLengthBitPackingHybridDecoder delegate;
+
+ public RLEIntIterator(RunLengthBitPackingHybridDecoder delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public int nextInt() {
+ try {
+ return delegate.readInt();
+ } catch (IOException e) {
+ throw new ParquetDecodingException(e);
+ }
+ }
+ }
+
+ /** Reading zero always. */
+ protected static final class NullIntIterator implements IntIterator {
+ @Override
+ public int nextInt() {
+ return 0;
+ }
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetDataColumnReaderFactory.java
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetDataColumnReaderFactory.java
index fdfe5d6fa3a3..3748e53fc284 100644
---
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetDataColumnReaderFactory.java
+++
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetDataColumnReaderFactory.java
@@ -26,12 +26,16 @@ import org.apache.parquet.bytes.ByteBufferInputStream;
import org.apache.parquet.column.Dictionary;
import org.apache.parquet.column.values.ValuesReader;
import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
import static
org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader.JULIAN_EPOCH_OFFSET_DAYS;
import static
org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader.MILLIS_IN_DAY;
@@ -252,21 +256,115 @@ public final class ParquetDataColumnReaderFactory {
}
}
+ /**
+ * Reader for Parquet INT64 timestamp values (MILLIS / MICROS / NANOS), i.e.
the standard
+ * timestamp encoding defined by Parquet's
+ * {@link LogicalTypeAnnotation.TimestampLogicalTypeAnnotation} and the
legacy
+ * {@link OriginalType#TIMESTAMP_MILLIS} / {@link
OriginalType#TIMESTAMP_MICROS} annotations.
+ * (The older INT96 encoding is marked deprecated by the Parquet format spec
— see
+ * <a
href="https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#timestamp">
+ * LogicalTypes.md</a> — but is still supported here via {@link
TypesFromInt96PageReader} for
+ * backwards compatibility with files written by older Hive / Spark / Impala
versions.)
+ *
+ * <p>Used by {@link NestedPrimitiveColumnReader} when a TIMESTAMP column
sits inside a
+ * {@code Row}, {@code Array} or {@code Map}; the top-level path continues
to use
+ * {@link Int64TimestampColumnReader} for batched-vector efficiency.
+ */
+ public static class TypesFromInt64PageReader extends
DefaultParquetDataColumnReader {
+ private final boolean isUtcTimestamp;
+ private final ChronoUnit chronoUnit;
+
+ public TypesFromInt64PageReader(
+ ValuesReader realReader, boolean isUtcTimestamp, ChronoUnit
chronoUnit) {
+ super(realReader);
+ this.isUtcTimestamp = isUtcTimestamp;
+ this.chronoUnit = chronoUnit;
+ }
+
+ public TypesFromInt64PageReader(
+ Dictionary dict, boolean isUtcTimestamp, ChronoUnit chronoUnit) {
+ super(dict);
+ this.isUtcTimestamp = isUtcTimestamp;
+ this.chronoUnit = chronoUnit;
+ }
+
+ @Override
+ public TimestampData readTimestamp() {
+ return int64ToTimestamp(isUtcTimestamp, valuesReader.readLong(),
chronoUnit);
+ }
+
+ @Override
+ public TimestampData readTimestamp(int id) {
+ return int64ToTimestamp(isUtcTimestamp, dict.decodeToLong(id),
chronoUnit);
+ }
+ }
+
private static ParquetDataColumnReader getDataColumnReaderByTypeHelper(
boolean isDictionary,
PrimitiveType parquetType,
Dictionary dictionary,
ValuesReader valuesReader,
boolean isUtcTimestamp) {
- if (parquetType.getPrimitiveTypeName() ==
PrimitiveType.PrimitiveTypeName.INT96) {
+ PrimitiveType.PrimitiveTypeName typeName =
parquetType.getPrimitiveTypeName();
+ if (typeName == PrimitiveType.PrimitiveTypeName.INT96) {
return isDictionary
? new TypesFromInt96PageReader(dictionary, isUtcTimestamp)
: new TypesFromInt96PageReader(valuesReader, isUtcTimestamp);
- } else {
- return isDictionary
- ? new DefaultParquetDataColumnReader(dictionary)
- : new DefaultParquetDataColumnReader(valuesReader);
}
+ if (typeName == PrimitiveType.PrimitiveTypeName.INT64) {
+ ChronoUnit unit = resolveInt64TimestampUnit(parquetType);
+ if (unit != null) {
+ return isDictionary
+ ? new TypesFromInt64PageReader(dictionary, isUtcTimestamp, unit)
+ : new TypesFromInt64PageReader(valuesReader, isUtcTimestamp, unit);
+ }
+ }
+ return isDictionary
+ ? new DefaultParquetDataColumnReader(dictionary)
+ : new DefaultParquetDataColumnReader(valuesReader);
+ }
+
+ /**
+ * Returns the {@link ChronoUnit} for a Parquet INT64 TIMESTAMP column, or
{@code null} if the
+ * column is a plain INT64 (not a timestamp).
+ *
+ * <p>Supports both the modern {@link
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation} and
+ * the legacy {@link OriginalType#TIMESTAMP_MILLIS} / {@link
OriginalType#TIMESTAMP_MICROS}
+ * encodings.
+ */
+ private static ChronoUnit resolveInt64TimestampUnit(PrimitiveType
parquetType) {
+ LogicalTypeAnnotation annotation = parquetType.getLogicalTypeAnnotation();
+ if (annotation instanceof
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) {
+ LogicalTypeAnnotation.TimeUnit unit =
+ ((LogicalTypeAnnotation.TimestampLogicalTypeAnnotation)
annotation).getUnit();
+ switch (unit) {
+ case MILLIS:
+ return ChronoUnit.MILLIS;
+ case MICROS:
+ return ChronoUnit.MICROS;
+ case NANOS:
+ return ChronoUnit.NANOS;
+ default:
+ return null;
+ }
+ }
+ OriginalType originalType = parquetType.getOriginalType();
+ if (originalType == OriginalType.TIMESTAMP_MILLIS) {
+ return ChronoUnit.MILLIS;
+ }
+ if (originalType == OriginalType.TIMESTAMP_MICROS) {
+ return ChronoUnit.MICROS;
+ }
+ return null;
+ }
+
+ private static TimestampData int64ToTimestamp(
+ boolean utcTimestamp, long value, ChronoUnit unit) {
+ Instant instant = Instant.EPOCH.plus(value, unit);
+ if (utcTimestamp) {
+ return TimestampData.fromInstant(instant);
+ }
+ return TimestampData.fromTimestamp(Timestamp.from(instant));
}
public static ParquetDataColumnReader getDataColumnReaderByTypeOnDictionary(
diff --git
a/hudi-flink-datasource/hudi-flink1.18.x/src/test/java/org/apache/hudi/table/format/cow/vector/TestHeapColumnVectorAccessors.java
b/hudi-flink-datasource/hudi-flink1.18.x/src/test/java/org/apache/hudi/table/format/cow/vector/TestHeapColumnVectorAccessors.java
new file mode 100644
index 000000000000..4b48fdd37460
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink1.18.x/src/test/java/org/apache/hudi/table/format/cow/vector/TestHeapColumnVectorAccessors.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector;
+
+import org.apache.flink.table.data.columnar.vector.heap.HeapIntVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapLongVector;
+import
org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertSame;
+
+/**
+ * Tests for the Flink 2.1-compatible accessors added on {@link
HeapArrayVector},
+ * {@link HeapMapColumnVector} and {@link HeapRowColumnVector} when vendoring
Flink 2.1's
+ * nested-Parquet reader (FLINK-35702).
+ *
+ * <p>The accessors are wrappers over the existing public fields so legacy
callers continue to
+ * work. These tests exist solely to pin down that wrapper contract — runtime
correctness of the
+ * Dremel-style read path is exercised end-to-end by integration tests once
+ * {@code ParquetSplitReaderUtil} is wired up to {@code NestedColumnReader} in
a follow-up PR.
+ */
+class TestHeapColumnVectorAccessors {
+
+ //
-----------------------------------------------------------------------------------------------
+ // HeapArrayVector
+ //
-----------------------------------------------------------------------------------------------
+
+ @Test
+ void heapArrayVectorAccessorsReflectPublicFields() {
+ HeapIntVector child = new HeapIntVector(4);
+ HeapArrayVector vector = new HeapArrayVector(2, child);
+
+ long[] offsets = {0L, 2L};
+ long[] lengths = {2L, 2L};
+ HeapLongVector replacementChild = new HeapLongVector(4);
+
+ vector.setOffsets(offsets);
+ vector.setLengths(lengths);
+ vector.setChild(replacementChild);
+ vector.setSize(2);
+
+ assertArrayEquals(offsets, vector.getOffsets());
+ assertArrayEquals(lengths, vector.getLengths());
+ assertSame(replacementChild, vector.getChild());
+ assertEquals(2, vector.getSize());
+
+ // Backing public fields are kept in sync — preserves backward
compatibility.
+ assertSame(offsets, vector.offsets);
+ assertSame(lengths, vector.lengths);
+ assertSame(replacementChild, vector.child);
+ }
+
+ //
-----------------------------------------------------------------------------------------------
+ // HeapMapColumnVector
+ //
-----------------------------------------------------------------------------------------------
+
+ @Test
+ void heapMapColumnVectorConstructorInitializesOffsetsAndLengths() {
+ HeapIntVector keys = new HeapIntVector(4);
+ HeapIntVector values = new HeapIntVector(4);
+
+ HeapMapColumnVector vector = new HeapMapColumnVector(3, keys, values);
+
+ assertEquals(3, vector.getOffsets().length);
+ assertEquals(3, vector.getLengths().length);
+ }
+
+ @Test
+ void heapMapColumnVectorAccessorsReflectInternalState() {
+ HeapIntVector keys = new HeapIntVector(4);
+ HeapIntVector values = new HeapIntVector(4);
+ HeapMapColumnVector vector = new HeapMapColumnVector(2, keys, values);
+
+ long[] offsets = {0L, 2L};
+ long[] lengths = {2L, 2L};
+ HeapLongVector newKeys = new HeapLongVector(4);
+ HeapLongVector newValues = new HeapLongVector(4);
+
+ vector.setOffsets(offsets);
+ vector.setLengths(lengths);
+ vector.setKeys(newKeys);
+ vector.setValues(newValues);
+ vector.setSize(2);
+
+ assertArrayEquals(offsets, vector.getOffsets());
+ assertArrayEquals(lengths, vector.getLengths());
+ assertSame(newKeys, vector.getKeys());
+ assertSame(newValues, vector.getValues());
+ // The Flink-2.1-style ColumnVector accessors return the same underlying
child.
+ assertSame(newKeys, vector.getKeyColumnVector());
+ assertSame(newValues, vector.getValueColumnVector());
+ assertEquals(2, vector.getSize());
+ }
+
+ //
-----------------------------------------------------------------------------------------------
+ // HeapRowColumnVector
+ //
-----------------------------------------------------------------------------------------------
+
+ @Test
+ void heapRowColumnVectorFieldsAccessorsReflectPublicVectors() {
+ HeapIntVector intField = new HeapIntVector(2);
+ HeapLongVector longField = new HeapLongVector(2);
+ HeapRowColumnVector vector = new HeapRowColumnVector(2, intField,
longField);
+
+ WritableColumnVector[] originalFields = vector.getFields();
+ assertEquals(2, originalFields.length);
+ assertSame(intField, originalFields[0]);
+ assertSame(longField, originalFields[1]);
+ // Backing public field is kept in sync — preserves backward compatibility.
+ assertSame(originalFields, vector.vectors);
+
+ HeapIntVector replacement = new HeapIntVector(2);
+ WritableColumnVector[] replacementFields = {replacement, longField};
+ vector.setFields(replacementFields);
+
+ assertSame(replacementFields, vector.getFields());
+ assertSame(replacementFields, vector.vectors);
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink1.18.x/src/test/java/org/apache/hudi/table/format/cow/vector/reader/TestParquetDataColumnReaderFactory.java
b/hudi-flink-datasource/hudi-flink1.18.x/src/test/java/org/apache/hudi/table/format/cow/vector/reader/TestParquetDataColumnReaderFactory.java
new file mode 100644
index 000000000000..9d6607d03feb
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink1.18.x/src/test/java/org/apache/hudi/table/format/cow/vector/reader/TestParquetDataColumnReaderFactory.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector.reader;
+
+import org.apache.flink.table.data.TimestampData;
+
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Types;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/**
+ * Tests for the {@link ParquetDataColumnReaderFactory} INT64 timestamp
dispatch added when
+ * vendoring Flink 2.1's nested-Parquet reader (FLINK-35702).
+ *
+ * <p>The factory is exercised end-to-end by integration tests through
+ * {@link NestedPrimitiveColumnReader}; this unit test focuses on the small,
deterministic piece
+ * that was added by this PR — selecting the right {@code
ParquetDataColumnReader} for each
+ * supported INT64 TIMESTAMP encoding (modern {@link
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation}
+ * MILLIS / MICROS / NANOS plus the legacy {@link OriginalType} encodings) and
decoding values
+ * using both the values-reader and dictionary code paths.
+ */
+class TestParquetDataColumnReaderFactory {
+
+ //
-----------------------------------------------------------------------------------------------
+ // Type dispatch
+ //
-----------------------------------------------------------------------------------------------
+
+ @Test
+ void valuesReaderDispatchInt96TimestampUsesInt96Reader() {
+ PrimitiveType type =
Types.required(PrimitiveType.PrimitiveTypeName.INT96).named("ts");
+ ParquetDataColumnReader reader =
+ ParquetDataColumnReaderFactory.getDataColumnReaderByType(type, new
StubValuesReader(), true);
+
assertInstanceOf(ParquetDataColumnReaderFactory.TypesFromInt96PageReader.class,
reader);
+ }
+
+ @Test
+ void valuesReaderDispatchInt64WithoutAnnotationUsesDefaultReader() {
+ PrimitiveType type =
Types.required(PrimitiveType.PrimitiveTypeName.INT64).named("plainLong");
+ ParquetDataColumnReader reader =
+ ParquetDataColumnReaderFactory.getDataColumnReaderByType(type, new
StubValuesReader(), true);
+
assertInstanceOf(ParquetDataColumnReaderFactory.DefaultParquetDataColumnReader.class,
reader);
+ }
+
+ @Test
+ void valuesReaderDispatchInt64TimestampMillisLogicalUsesInt64Reader() {
+ PrimitiveType type =
+ Types.required(PrimitiveType.PrimitiveTypeName.INT64)
+ .as(LogicalTypeAnnotation.timestampType(true,
LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("ts");
+ ParquetDataColumnReader reader =
+ ParquetDataColumnReaderFactory.getDataColumnReaderByType(type, new
StubValuesReader(), true);
+
assertInstanceOf(ParquetDataColumnReaderFactory.TypesFromInt64PageReader.class,
reader);
+ }
+
+ @Test
+ void valuesReaderDispatchInt64TimestampMicrosLogicalUsesInt64Reader() {
+ PrimitiveType type =
+ Types.required(PrimitiveType.PrimitiveTypeName.INT64)
+ .as(LogicalTypeAnnotation.timestampType(false,
LogicalTypeAnnotation.TimeUnit.MICROS))
+ .named("ts");
+ ParquetDataColumnReader reader =
+ ParquetDataColumnReaderFactory.getDataColumnReaderByType(type, new
StubValuesReader(), true);
+
assertInstanceOf(ParquetDataColumnReaderFactory.TypesFromInt64PageReader.class,
reader);
+ }
+
+ @Test
+ void valuesReaderDispatchInt64TimestampNanosLogicalUsesInt64Reader() {
+ PrimitiveType type =
+ Types.required(PrimitiveType.PrimitiveTypeName.INT64)
+ .as(LogicalTypeAnnotation.timestampType(true,
LogicalTypeAnnotation.TimeUnit.NANOS))
+ .named("ts");
+ ParquetDataColumnReader reader =
+ ParquetDataColumnReaderFactory.getDataColumnReaderByType(type, new
StubValuesReader(), true);
+
assertInstanceOf(ParquetDataColumnReaderFactory.TypesFromInt64PageReader.class,
reader);
+ }
+
+ @Test
+ void valuesReaderDispatchInt64LegacyTimestampMillisOriginalUsesInt64Reader()
{
+ PrimitiveType type =
+ Types.required(PrimitiveType.PrimitiveTypeName.INT64)
+ .as(OriginalType.TIMESTAMP_MILLIS)
+ .named("ts");
+ ParquetDataColumnReader reader =
+ ParquetDataColumnReaderFactory.getDataColumnReaderByType(type, new
StubValuesReader(), true);
+
assertInstanceOf(ParquetDataColumnReaderFactory.TypesFromInt64PageReader.class,
reader);
+ }
+
+ @Test
+ void valuesReaderDispatchInt64LegacyTimestampMicrosOriginalUsesInt64Reader()
{
+ PrimitiveType type =
+ Types.required(PrimitiveType.PrimitiveTypeName.INT64)
+ .as(OriginalType.TIMESTAMP_MICROS)
+ .named("ts");
+ ParquetDataColumnReader reader =
+ ParquetDataColumnReaderFactory.getDataColumnReaderByType(type, new
StubValuesReader(), true);
+
assertInstanceOf(ParquetDataColumnReaderFactory.TypesFromInt64PageReader.class,
reader);
+ }
+
+ @Test
+ void valuesReaderDispatchInt32DoesNotUseTimestampReader() {
+ PrimitiveType type =
Types.required(PrimitiveType.PrimitiveTypeName.INT32).named("i");
+ ParquetDataColumnReader reader =
+ ParquetDataColumnReaderFactory.getDataColumnReaderByType(type, new
StubValuesReader(), true);
+
assertInstanceOf(ParquetDataColumnReaderFactory.DefaultParquetDataColumnReader.class,
reader);
+ }
+
+ @Test
+ void dictionaryReaderDispatchInt64TimestampMillisLogicalUsesInt64Reader() {
+ PrimitiveType type =
+ Types.required(PrimitiveType.PrimitiveTypeName.INT64)
+ .as(LogicalTypeAnnotation.timestampType(true,
LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("ts");
+ ParquetDataColumnReader reader =
+ ParquetDataColumnReaderFactory.getDataColumnReaderByTypeOnDictionary(
+ type, new StubDictionary(), true);
+
assertInstanceOf(ParquetDataColumnReaderFactory.TypesFromInt64PageReader.class,
reader);
+ }
+
+ //
-----------------------------------------------------------------------------------------------
+ // INT64 → TimestampData decoding (per ChronoUnit, both UTC and
local-time-zone branches)
+ //
-----------------------------------------------------------------------------------------------
+
+ @Test
+ void int64ReaderReadsTimestampMillisFromValuesReaderInUtc() {
+ PrimitiveType type =
+ Types.required(PrimitiveType.PrimitiveTypeName.INT64)
+ .as(LogicalTypeAnnotation.timestampType(true,
LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("ts");
+ long epochMillis = 1_700_000_000_123L;
+ ParquetDataColumnReader reader =
+ ParquetDataColumnReaderFactory.getDataColumnReaderByType(
+ type, new StubValuesReader(epochMillis), true);
+
+ TimestampData ts = reader.readTimestamp();
+ assertNotNull(ts);
+ assertEquals(epochMillis, ts.getMillisecond());
+ assertEquals(0, ts.getNanoOfMillisecond());
+ }
+
+ @Test
+ void int64ReaderReadsTimestampMicrosFromValuesReaderInUtc() {
+ PrimitiveType type =
+ Types.required(PrimitiveType.PrimitiveTypeName.INT64)
+ .as(LogicalTypeAnnotation.timestampType(true,
LogicalTypeAnnotation.TimeUnit.MICROS))
+ .named("ts");
+ long epochMicros = 1_700_000_000_123_456L;
+ ParquetDataColumnReader reader =
+ ParquetDataColumnReaderFactory.getDataColumnReaderByType(
+ type, new StubValuesReader(epochMicros), true);
+
+ TimestampData ts = reader.readTimestamp();
+ assertNotNull(ts);
+ assertEquals(epochMicros / 1_000L, ts.getMillisecond());
+ // 456 microseconds remain → 456_000 nanoseconds within the millisecond
+ assertEquals(456_000, ts.getNanoOfMillisecond());
+ }
+
+ @Test
+ void int64ReaderReadsTimestampNanosFromValuesReaderInUtc() {
+ PrimitiveType type =
+ Types.required(PrimitiveType.PrimitiveTypeName.INT64)
+ .as(LogicalTypeAnnotation.timestampType(true,
LogicalTypeAnnotation.TimeUnit.NANOS))
+ .named("ts");
+ long epochNanos = 1_700_000_000_123_456_789L;
+ ParquetDataColumnReader reader =
+ ParquetDataColumnReaderFactory.getDataColumnReaderByType(
+ type, new StubValuesReader(epochNanos), true);
+
+ TimestampData ts = reader.readTimestamp();
+ assertNotNull(ts);
+ assertEquals(epochNanos / 1_000_000L, ts.getMillisecond());
+ assertEquals(456_789, ts.getNanoOfMillisecond());
+ }
+
+ @Test
+ void int64ReaderReadsTimestampMillisFromDictionaryInUtc() {
+ PrimitiveType type =
+ Types.required(PrimitiveType.PrimitiveTypeName.INT64)
+ .as(LogicalTypeAnnotation.timestampType(true,
LogicalTypeAnnotation.TimeUnit.MILLIS))
+ .named("ts");
+ long epochMillis = 1_700_000_000_456L;
+ ParquetDataColumnReader reader =
+ ParquetDataColumnReaderFactory.getDataColumnReaderByTypeOnDictionary(
+ type, new StubDictionary(epochMillis), true);
+
+ TimestampData ts = reader.readTimestamp(0);
+ assertNotNull(ts);
+ assertEquals(epochMillis, ts.getMillisecond());
+ }
+
+ //
-----------------------------------------------------------------------------------------------
+ // Stubs (only the methods exercised by the dispatch + decoding tests above)
+ //
-----------------------------------------------------------------------------------------------
+
+ /** Minimal {@link ValuesReader} returning a fixed long; other methods
throw. */
+ private static final class StubValuesReader extends ValuesReader {
+ private final long fixedLong;
+
+ StubValuesReader() {
+ this(0L);
+ }
+
+ StubValuesReader(long fixedLong) {
+ this.fixedLong = fixedLong;
+ }
+
+ @Override
+ public long readLong() {
+ return fixedLong;
+ }
+
+ @Override
+ public void skip() {
+ // unused
+ }
+ }
+
+ /** Minimal {@link Dictionary} returning a fixed long for any id; other
methods throw. */
+ private static final class StubDictionary extends Dictionary {
+ private final long fixedLong;
+
+ StubDictionary() {
+ this(0L);
+ }
+
+ StubDictionary(long fixedLong) {
+ super(null);
+ this.fixedLong = fixedLong;
+ }
+
+ @Override
+ public Binary decodeToBinary(int id) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long decodeToLong(int id) {
+ return fixedLong;
+ }
+
+ @Override
+ public int getMaxId() {
+ return 0;
+ }
+ }
+}