This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 40295605aae6 feat(flink): Backport Flink 2.1 nested Parquet column 
readers and INT64 timestamp dispatch (FLINK-35702) (#18636)
40295605aae6 is described below

commit 40295605aae691da07e93a1a2919d38b41cbb6dd
Author: Shihuan Liu <[email protected]>
AuthorDate: Wed May 6 23:23:48 2026 -0700

    feat(flink): Backport Flink 2.1 nested Parquet column readers and INT64 
timestamp dispatch (FLINK-35702) (#18636)
    
    * feat(flink): Backport Flink 2.1 nested Parquet column readers and INT64 
timestamp dispatch (FLINK-35702)
    * Minor fixes
---
 .../table/format/cow/vector/HeapArrayVector.java   |  31 +
 .../format/cow/vector/HeapMapColumnVector.java     |  63 +-
 .../format/cow/vector/HeapRowColumnVector.java     |  15 +
 .../cow/vector/reader/NestedColumnReader.java      | 257 +++++++++
 .../vector/reader/NestedPrimitiveColumnReader.java | 639 +++++++++++++++++++++
 .../reader/ParquetDataColumnReaderFactory.java     | 108 +++-
 .../cow/vector/TestHeapColumnVectorAccessors.java  | 138 +++++
 .../reader/TestParquetDataColumnReaderFactory.java | 272 +++++++++
 8 files changed, 1516 insertions(+), 7 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapArrayVector.java
 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapArrayVector.java
index a0dced01e5e8..2f21a323302f 100644
--- 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapArrayVector.java
+++ 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapArrayVector.java
@@ -57,6 +57,37 @@ public class HeapArrayVector extends AbstractHeapVector
     return this.isNull.length;
   }
 
+  // 
---------------------------------------------------------------------------------------------
+  // Flink 2.1-compatible accessors. Backed by the existing public {@code 
offsets}, {@code lengths}
+  // and {@code child} fields so legacy callers continue to work; the new 
{@link
+  // org.apache.hudi.table.format.cow.vector.reader.NestedColumnReader} 
(FLINK-35702 port) and any
+  // future Flink-2.1-style caller use these accessors.
+  // 
---------------------------------------------------------------------------------------------
+
+  public long[] getOffsets() {
+    return offsets;
+  }
+
+  public void setOffsets(long[] offsets) {
+    this.offsets = offsets;
+  }
+
+  public long[] getLengths() {
+    return lengths;
+  }
+
+  public void setLengths(long[] lengths) {
+    this.lengths = lengths;
+  }
+
+  public ColumnVector getChild() {
+    return child;
+  }
+
+  public void setChild(ColumnVector child) {
+    this.child = child;
+  }
+
   @Override
   public ArrayData getArray(int i) {
     long offset = offsets[i];
diff --git 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java
 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java
index 0d83f82baedf..a98bdebd707a 100644
--- 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java
+++ 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapMapColumnVector.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table.format.cow.vector;
 
 import lombok.Getter;
 import org.apache.flink.table.data.MapData;
+import org.apache.flink.table.data.columnar.vector.ColumnVector;
 import org.apache.flink.table.data.columnar.vector.MapColumnVector;
 import org.apache.flink.table.data.columnar.vector.heap.AbstractHeapVector;
 import 
org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector;
@@ -31,16 +32,74 @@ public class HeapMapColumnVector extends AbstractHeapVector
     implements WritableColumnVector, MapColumnVector {
 
   @Getter
-  private final WritableColumnVector keys;
+  private WritableColumnVector keys;
   @Getter
-  private final WritableColumnVector values;
+  private WritableColumnVector values;
+
+  // 
---------------------------------------------------------------------------------------------
+  // Flink 2.1 Dremel-style state. Populated by {@link
+  // org.apache.hudi.table.format.cow.vector.reader.NestedColumnReader} 
(FLINK-35702 port). The
+  // legacy {@link #getMap(int)} implementation below continues to use {@code 
ColumnarGroupMapData}
+  // — wiring it through these offsets/lengths happens in a follow-up PR that 
switches the read
+  // path. Left here so the new readers can compile against the additive 
surface.
+  // 
---------------------------------------------------------------------------------------------
+  private long[] offsets;
+  private long[] lengths;
+  private int size;
 
   public HeapMapColumnVector(int len, WritableColumnVector keys, 
WritableColumnVector values) {
     super(len);
+    this.offsets = new long[len];
+    this.lengths = new long[len];
     this.keys = keys;
     this.values = values;
   }
 
+  public long[] getOffsets() {
+    return offsets;
+  }
+
+  public void setOffsets(long[] offsets) {
+    this.offsets = offsets;
+  }
+
+  public long[] getLengths() {
+    return lengths;
+  }
+
+  public void setLengths(long[] lengths) {
+    this.lengths = lengths;
+  }
+
+  public int getSize() {
+    return size;
+  }
+
+  public void setSize(int size) {
+    this.size = size;
+  }
+
+  public void setKeys(WritableColumnVector keys) {
+    this.keys = keys;
+  }
+
+  public void setValues(WritableColumnVector values) {
+    this.values = values;
+  }
+
+  /**
+   * Returns the keys child vector typed as {@link ColumnVector}, matching the 
Flink 2.1 contract
+   * consumed by {@code NestedColumnReader}. Functionally equivalent to {@link 
#getKeys()}.
+   */
+  public ColumnVector getKeyColumnVector() {
+    return keys;
+  }
+
+  /** Counterpart of {@link #getKeyColumnVector()} for the values child 
vector. */
+  public ColumnVector getValueColumnVector() {
+    return values;
+  }
+
   @Override
   public MapData getMap(int rowId) {
     return new ColumnarGroupMapData(keys, values, rowId);
diff --git 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
index ae194e4e6ab0..0c640ce92ee4 100644
--- 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
+++ 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapRowColumnVector.java
@@ -37,6 +37,21 @@ public class HeapRowColumnVector extends AbstractHeapVector
     this.vectors = vectors;
   }
 
+  /**
+   * Flink 2.1-compatible accessor for the children vectors. Backed by the 
existing public {@code
+   * vectors} field so legacy callers continue to work; the new {@link
+   * org.apache.hudi.table.format.cow.vector.reader.NestedColumnReader} 
(FLINK-35702 port) and any
+   * future Flink-2.1-style caller use this accessor.
+   */
+  public WritableColumnVector[] getFields() {
+    return vectors;
+  }
+
+  /** Counterpart of {@link #getFields()}. */
+  public void setFields(WritableColumnVector[] fields) {
+    this.vectors = fields;
+  }
+
   @Override
   public ColumnarRowData getRow(int i) {
     ColumnarRowData columnarRowData = new ColumnarRowData(new 
VectorizedColumnBatch(vectors));
diff --git 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/NestedColumnReader.java
 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/NestedColumnReader.java
new file mode 100644
index 000000000000..27eab298b320
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/NestedColumnReader.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector.reader;
+
+import org.apache.hudi.table.format.cow.utils.NestedPositionUtil;
+import org.apache.hudi.table.format.cow.vector.HeapArrayVector;
+import org.apache.hudi.table.format.cow.vector.HeapMapColumnVector;
+import org.apache.hudi.table.format.cow.vector.HeapRowColumnVector;
+import org.apache.hudi.table.format.cow.vector.position.CollectionPosition;
+import org.apache.hudi.table.format.cow.vector.position.LevelDelegation;
+import org.apache.hudi.table.format.cow.vector.position.RowPosition;
+import org.apache.hudi.table.format.cow.vector.type.ParquetField;
+import org.apache.hudi.table.format.cow.vector.type.ParquetGroupField;
+import org.apache.hudi.table.format.cow.vector.type.ParquetPrimitiveField;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
+import org.apache.flink.table.data.columnar.vector.ColumnVector;
+import org.apache.flink.table.data.columnar.vector.heap.AbstractHeapVector;
+import 
org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector;
+import org.apache.flink.table.types.logical.ArrayType;
+import org.apache.flink.table.types.logical.MapType;
+import org.apache.flink.table.types.logical.MultisetType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.page.PageReadStore;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * ColumnReader used to read a {@code Group} type in Parquet ({@code Map}, 
{@code Array}, {@code
+ * Row}). Resolves nested structures using Dremel striping/assembly; see <a
+ * 
href="https://github.com/julienledem/redelm/wiki/The-striping-and-assembly-algorithms-from-the-Dremel-paper";>the
+ * striping and assembly algorithms from the Dremel paper</a>.
+ *
+ * <p>Vendored from Apache Flink 2.1 (FLINK-35702, {@code
+ * org.apache.flink.formats.parquet.vector.reader.NestedColumnReader}). 
Differences vs. upstream:
+ *
+ * <ul>
+ *   <li>Uses Hudi-local {@code HeapRowColumnVector}/{@code 
HeapMapColumnVector}/{@code
+ *       HeapArrayVector} instead of the Flink-private {@code 
HeapRowVector}/{@code
+ *       HeapMapVector}/{@code HeapArrayVector}.
+ *   <li>Supports Hudi's schema-evolution contract: a {@code 
ParquetGroupField} representing a
+ *       {@link RowType} may contain {@code null} children — meaning the 
corresponding logical
+ *       field is absent from the Parquet file. Those slots are passed through 
unchanged and do
+ *       not contribute to the row's repetition/definition-level stream.
+ * </ul>
+ */
+public class NestedColumnReader implements ColumnReader<WritableColumnVector> {
+
+  private final Map<ColumnDescriptor, NestedPrimitiveColumnReader> 
columnReaders;
+  private final boolean isUtcTimestamp;
+
+  private final PageReadStore pages;
+
+  private final ParquetField field;
+
+  public NestedColumnReader(boolean isUtcTimestamp, PageReadStore pages, 
ParquetField field) {
+    this.isUtcTimestamp = isUtcTimestamp;
+    this.pages = pages;
+    this.field = field;
+    this.columnReaders = new HashMap<>();
+  }
+
+  @Override
+  public void readToVector(int readNumber, WritableColumnVector vector) throws 
IOException {
+    readData(field, readNumber, vector, false);
+  }
+
+  private Tuple2<LevelDelegation, WritableColumnVector> readData(
+      ParquetField field, int readNumber, ColumnVector vector, boolean inside) 
throws IOException {
+    if (field.getType() instanceof RowType) {
+      return readRow((ParquetGroupField) field, readNumber, vector, inside);
+    } else if (field.getType() instanceof MapType || field.getType() 
instanceof MultisetType) {
+      return readMap((ParquetGroupField) field, readNumber, vector, inside);
+    } else if (field.getType() instanceof ArrayType) {
+      return readArray((ParquetGroupField) field, readNumber, vector, inside);
+    } else {
+      return readPrimitive((ParquetPrimitiveField) field, readNumber, vector);
+    }
+  }
+
+  private Tuple2<LevelDelegation, WritableColumnVector> readRow(
+      ParquetGroupField field, int readNumber, ColumnVector vector, boolean 
inside)
+      throws IOException {
+    HeapRowColumnVector heapRowVector = (HeapRowColumnVector) vector;
+    LevelDelegation levelDelegation = null;
+    List<ParquetField> children = field.getChildren();
+    WritableColumnVector[] childrenVectors = heapRowVector.getFields();
+    WritableColumnVector[] finalChildrenVectors = new 
WritableColumnVector[childrenVectors.length];
+    for (int i = 0; i < children.size(); i++) {
+      ParquetField child = children.get(i);
+      if (child == null) {
+        // Hudi schema-evolution: the logical field is not present in the 
Parquet file. The slot
+        // vector is expected to be pre-populated with nulls by the caller 
(lands in a follow-up
+        // PR that rewires ParquetSplitReaderUtil); keep it as is and skip 
contributing to the
+        // level stream.
+        finalChildrenVectors[i] = childrenVectors[i];
+        continue;
+      }
+      Tuple2<LevelDelegation, WritableColumnVector> tuple =
+          readData(child, readNumber, childrenVectors[i], true);
+      levelDelegation = tuple.f0;
+      finalChildrenVectors[i] = tuple.f1;
+    }
+    if (levelDelegation == null) {
+      throw new FlinkRuntimeException(
+          String.format("Row field does not have any non-null children: %s.", 
field));
+    }
+
+    RowPosition rowPosition =
+        NestedPositionUtil.calculateRowOffsets(
+            field,
+            levelDelegation.getDefinitionLevel(),
+            levelDelegation.getRepetitionLevel());
+
+    // If row was inside the structure, then we need to renew the vector to 
reset the
+    // capacity.
+    if (inside) {
+      heapRowVector = new HeapRowColumnVector(rowPosition.getPositionsCount(), 
finalChildrenVectors);
+    } else {
+      heapRowVector.setFields(finalChildrenVectors);
+    }
+
+    if (rowPosition.getIsNull() != null) {
+      setFieldNullFlag(rowPosition.getIsNull(), heapRowVector);
+    }
+    return Tuple2.of(levelDelegation, heapRowVector);
+  }
+
+  private Tuple2<LevelDelegation, WritableColumnVector> readMap(
+      ParquetGroupField field, int readNumber, ColumnVector vector, boolean 
inside)
+      throws IOException {
+    HeapMapColumnVector mapVector = (HeapMapColumnVector) vector;
+    mapVector.reset();
+    List<ParquetField> children = field.getChildren();
+    Preconditions.checkArgument(
+        children.size() == 2,
+        "Maps must have two type parameters, found %s",
+        children.size());
+    Tuple2<LevelDelegation, WritableColumnVector> keyTuple =
+        readData(children.get(0), readNumber, mapVector.getKeyColumnVector(), 
true);
+    Tuple2<LevelDelegation, WritableColumnVector> valueTuple =
+        readData(children.get(1), readNumber, 
mapVector.getValueColumnVector(), true);
+
+    LevelDelegation levelDelegation = keyTuple.f0;
+
+    CollectionPosition collectionPosition =
+        NestedPositionUtil.calculateCollectionOffsets(
+            field,
+            levelDelegation.getDefinitionLevel(),
+            levelDelegation.getRepetitionLevel());
+
+    // If map was inside the structure, then we need to renew the vector to 
reset the
+    // capacity.
+    if (inside) {
+      mapVector = new HeapMapColumnVector(collectionPosition.getValueCount(), 
keyTuple.f1, valueTuple.f1);
+    } else {
+      mapVector.setKeys(keyTuple.f1);
+      mapVector.setValues(valueTuple.f1);
+    }
+
+    if (collectionPosition.getIsNull() != null) {
+      setFieldNullFlag(collectionPosition.getIsNull(), mapVector);
+    }
+
+    mapVector.setLengths(collectionPosition.getLength());
+    mapVector.setOffsets(collectionPosition.getOffsets());
+
+    return Tuple2.of(levelDelegation, mapVector);
+  }
+
+  private Tuple2<LevelDelegation, WritableColumnVector> readArray(
+      ParquetGroupField field, int readNumber, ColumnVector vector, boolean 
inside)
+      throws IOException {
+    HeapArrayVector arrayVector = (HeapArrayVector) vector;
+    arrayVector.reset();
+    List<ParquetField> children = field.getChildren();
+    Preconditions.checkArgument(
+        children.size() == 1,
+        "Arrays must have a single type parameter, found %s",
+        children.size());
+    Tuple2<LevelDelegation, WritableColumnVector> tuple =
+        readData(children.get(0), readNumber, arrayVector.getChild(), true);
+
+    LevelDelegation levelDelegation = tuple.f0;
+    CollectionPosition collectionPosition =
+        NestedPositionUtil.calculateCollectionOffsets(
+            field,
+            levelDelegation.getDefinitionLevel(),
+            levelDelegation.getRepetitionLevel());
+
+    // If array was inside the structure, then we need to renew the vector to 
reset the
+    // capacity.
+    if (inside) {
+      arrayVector = new HeapArrayVector(collectionPosition.getValueCount(), 
tuple.f1);
+    } else {
+      arrayVector.setChild(tuple.f1);
+    }
+
+    if (collectionPosition.getIsNull() != null) {
+      setFieldNullFlag(collectionPosition.getIsNull(), arrayVector);
+    }
+    arrayVector.setLengths(collectionPosition.getLength());
+    arrayVector.setOffsets(collectionPosition.getOffsets());
+    return Tuple2.of(levelDelegation, arrayVector);
+  }
+
+  private Tuple2<LevelDelegation, WritableColumnVector> readPrimitive(
+      ParquetPrimitiveField field, int readNumber, ColumnVector vector) throws 
IOException {
+    ColumnDescriptor descriptor = field.getDescriptor();
+    NestedPrimitiveColumnReader reader = columnReaders.get(descriptor);
+    if (reader == null) {
+      reader =
+          new NestedPrimitiveColumnReader(
+              descriptor,
+              pages.getPageReader(descriptor),
+              isUtcTimestamp,
+              descriptor.getPrimitiveType(),
+              field.getType());
+      columnReaders.put(descriptor, reader);
+    }
+    WritableColumnVector writableColumnVector =
+        reader.readAndNewVector(readNumber, (WritableColumnVector) vector);
+    return Tuple2.of(reader.getLevelDelegation(), writableColumnVector);
+  }
+
+  private static void setFieldNullFlag(boolean[] nullFlags, AbstractHeapVector 
vector) {
+    for (int index = 0; index < vector.getLen() && index < nullFlags.length; 
index++) {
+      if (nullFlags[index]) {
+        vector.setNullAt(index);
+      }
+    }
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/NestedPrimitiveColumnReader.java
 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/NestedPrimitiveColumnReader.java
new file mode 100644
index 000000000000..3f0aefe2af74
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/NestedPrimitiveColumnReader.java
@@ -0,0 +1,639 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector.reader;
+
+import org.apache.hudi.table.format.cow.utils.IntArrayList;
+import org.apache.hudi.table.format.cow.vector.ParquetDecimalVector;
+import org.apache.hudi.table.format.cow.vector.position.LevelDelegation;
+
+import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
+import org.apache.flink.table.data.TimestampData;
+import org.apache.flink.table.data.columnar.vector.heap.HeapBooleanVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapByteVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapBytesVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapDoubleVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapFloatVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapIntVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapLongVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapShortVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapTimestampVector;
+import 
org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.page.DataPage;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DataPageV2;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.PageReader;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridDecoder;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Type;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
+import static org.apache.parquet.column.ValuesType.VALUES;
+
+/**
+ * Reader to read a single primitive leaf column that participates in a nested 
(Dremel) structure.
+ *
+ * <p>Vendored from Apache Flink 2.1 (FLINK-35702, {@code
+ * 
org.apache.flink.formats.parquet.vector.reader.NestedPrimitiveColumnReader}). 
Only the package
+ * and the Hudi-local {@link ParquetDecimalVector} / {@link LevelDelegation} / 
{@link IntArrayList}
+ * imports are changed; the algorithm is untouched. The companion 
Hudi-specific {@code
+ * Int64TimestampColumnReader} / {@code FixedLenBytesColumnReader} behaviours 
stay at the leaf-
+ * reader creation boundary in {@code ParquetSplitReaderUtil} (lands in a 
follow-up PR), not inside
+ * this class — keeping it a faithful copy of upstream.
+ */
+public class NestedPrimitiveColumnReader implements 
ColumnReader<WritableColumnVector> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(NestedPrimitiveColumnReader.class);
+
+  private final IntArrayList repetitionLevelList = new IntArrayList(0);
+  private final IntArrayList definitionLevelList = new IntArrayList(0);
+
+  private final PageReader pageReader;
+  private final ColumnDescriptor descriptor;
+  private final Type type;
+  private final LogicalType logicalType;
+
+  /** The dictionary, if this column has dictionary encoding. */
+  private final ParquetDataColumnReader dictionary;
+
+  /** Maximum definition level for this column. */
+  private final int maxDefLevel;
+
+  private boolean isUtcTimestamp;
+
+  /** Total number of values read. */
+  private long valuesRead;
+
+  /**
+   * value that indicates the end of the current page. That is, if valuesRead 
==
+   * endOfPageValueCount, we are at the end of the page.
+   */
+  private long endOfPageValueCount;
+
+  /** If true, the current page is dictionary encoded. */
+  private boolean isCurrentPageDictionaryEncoded;
+
+  private int definitionLevel;
+  private int repetitionLevel;
+
+  /** Repetition/Definition/Value readers. */
+  private IntIterator repetitionLevelColumn;
+
+  private IntIterator definitionLevelColumn;
+  private ParquetDataColumnReader dataColumn;
+
+  /** Total values in the current page. */
+  private int pageValueCount;
+
+  // flag to indicate if there is no data in parquet data page
+  private boolean eof = false;
+
+  private boolean isFirstRow = true;
+
+  private Object lastValue;
+
+  public NestedPrimitiveColumnReader(
+      ColumnDescriptor descriptor,
+      PageReader pageReader,
+      boolean isUtcTimestamp,
+      Type parquetType,
+      LogicalType logicalType)
+      throws IOException {
+    this.descriptor = descriptor;
+    this.type = parquetType;
+    this.pageReader = pageReader;
+    this.maxDefLevel = descriptor.getMaxDefinitionLevel();
+    this.isUtcTimestamp = isUtcTimestamp;
+    this.logicalType = logicalType;
+
+    DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
+    if (dictionaryPage != null) {
+      try {
+        this.dictionary =
+            
ParquetDataColumnReaderFactory.getDataColumnReaderByTypeOnDictionary(
+                parquetType.asPrimitiveType(),
+                dictionaryPage.getEncoding().initDictionary(descriptor, 
dictionaryPage),
+                isUtcTimestamp);
+        this.isCurrentPageDictionaryEncoded = true;
+      } catch (IOException e) {
+        throw new IOException(
+            String.format("Could not decode the dictionary for %s", 
descriptor), e);
+      }
+    } else {
+      this.dictionary = null;
+      this.isCurrentPageDictionaryEncoded = false;
+    }
+  }
+
+  // Not invoked directly; callers use readAndNewVector instead.
+  @Override
+  public void readToVector(int readNumber, WritableColumnVector vector) throws 
IOException {
+    throw new UnsupportedOperationException("This function should not be 
called.");
+  }
+
+  public WritableColumnVector readAndNewVector(int readNumber, 
WritableColumnVector vector)
+      throws IOException {
+    if (isFirstRow) {
+      if (!readValue()) {
+        return vector;
+      }
+      isFirstRow = false;
+    }
+
+    // index to set value.
+    int index = 0;
+    int valueIndex = 0;
+    List<Object> valueList = new ArrayList<>();
+
+    // repeated type need two loops to read data.
+    while (!eof && index < readNumber) {
+      do {
+        valueList.add(lastValue);
+        valueIndex++;
+      } while (readValue() && (repetitionLevel != 0));
+      index++;
+    }
+
+    return fillColumnVector(valueIndex, valueList);
+  }
+
+  public LevelDelegation getLevelDelegation() {
+    int[] repetition = repetitionLevelList.toArray();
+    int[] definition = definitionLevelList.toArray();
+    repetitionLevelList.clear();
+    definitionLevelList.clear();
+    repetitionLevelList.add(repetitionLevel);
+    definitionLevelList.add(definitionLevel);
+    return new LevelDelegation(repetition, definition);
+  }
+
+  private boolean readValue() throws IOException {
+    int left = readPageIfNeed();
+    if (left > 0) {
+      // get the values of repetition and definitionLevel
+      readAndSaveRepetitionAndDefinitionLevels();
+      // read the data if it isn't null
+      if (definitionLevel == maxDefLevel) {
+        if (isCurrentPageDictionaryEncoded) {
+          int dictionaryId = dataColumn.readValueDictionaryId();
+          lastValue = dictionaryDecodeValue(logicalType, dictionaryId);
+        } else {
+          lastValue = readPrimitiveTypedRow(logicalType);
+        }
+      } else {
+        lastValue = null;
+      }
+      return true;
+    } else {
+      eof = true;
+      return false;
+    }
+  }
+
+  private void readAndSaveRepetitionAndDefinitionLevels() {
+    // get the values of repetition and definitionLevel
+    repetitionLevel = repetitionLevelColumn.nextInt();
+    definitionLevel = definitionLevelColumn.nextInt();
+    valuesRead++;
+    repetitionLevelList.add(repetitionLevel);
+    definitionLevelList.add(definitionLevel);
+  }
+
+  private int readPageIfNeed() throws IOException {
+    // Compute the number of values we want to read in this page.
+    int leftInPage = (int) (endOfPageValueCount - valuesRead);
+    if (leftInPage == 0) {
+      // no data left in current page, load data from new page
+      readPage();
+      leftInPage = (int) (endOfPageValueCount - valuesRead);
+    }
+    return leftInPage;
+  }
+
+  private Object readPrimitiveTypedRow(LogicalType category) {
+    switch (category.getTypeRoot()) {
+      case CHAR:
+      case VARCHAR:
+      case BINARY:
+      case VARBINARY:
+        return dataColumn.readBytes();
+      case BOOLEAN:
+        return dataColumn.readBoolean();
+      case TIME_WITHOUT_TIME_ZONE:
+      case DATE:
+      case INTEGER:
+        return dataColumn.readInteger();
+      case TINYINT:
+        return dataColumn.readTinyInt();
+      case SMALLINT:
+        return dataColumn.readSmallInt();
+      case BIGINT:
+        return dataColumn.readLong();
+      case FLOAT:
+        return dataColumn.readFloat();
+      case DOUBLE:
+        return dataColumn.readDouble();
+      case DECIMAL:
+        switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
+          case INT32:
+            return dataColumn.readInteger();
+          case INT64:
+            return dataColumn.readLong();
+          case BINARY:
+          case FIXED_LEN_BYTE_ARRAY:
+            return dataColumn.readBytes();
+          default:
+            throw new RuntimeException(
+                "Unsupported physical type for DECIMAL: " + 
descriptor.getPrimitiveType());
+        }
+      case TIMESTAMP_WITHOUT_TIME_ZONE:
+      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+        return dataColumn.readTimestamp();
+      default:
+        throw new RuntimeException("Unsupported type in the list: " + type);
+    }
+  }
+
+  private Object dictionaryDecodeValue(LogicalType category, Integer 
dictionaryValue) {
+    if (dictionaryValue == null) {
+      return null;
+    }
+
+    switch (category.getTypeRoot()) {
+      case CHAR:
+      case VARCHAR:
+      case BINARY:
+      case VARBINARY:
+        return dictionary.readBytes(dictionaryValue);
+      case DATE:
+      case TIME_WITHOUT_TIME_ZONE:
+      case INTEGER:
+        return dictionary.readInteger(dictionaryValue);
+      case BOOLEAN:
+        return dictionary.readBoolean(dictionaryValue) ? 1 : 0;
+      case DOUBLE:
+        return dictionary.readDouble(dictionaryValue);
+      case FLOAT:
+        return dictionary.readFloat(dictionaryValue);
+      case TINYINT:
+        return dictionary.readTinyInt(dictionaryValue);
+      case SMALLINT:
+        return dictionary.readSmallInt(dictionaryValue);
+      case BIGINT:
+        return dictionary.readLong(dictionaryValue);
+      case DECIMAL:
+        switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
+          case INT32:
+            return dictionary.readInteger(dictionaryValue);
+          case INT64:
+            return dictionary.readLong(dictionaryValue);
+          case FIXED_LEN_BYTE_ARRAY:
+          case BINARY:
+            return dictionary.readBytes(dictionaryValue);
+          default:
+            throw new RuntimeException(
+                "Unsupported physical type for DECIMAL: " + 
descriptor.getPrimitiveType());
+        }
+      case TIMESTAMP_WITHOUT_TIME_ZONE:
+      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+        return dictionary.readTimestamp(dictionaryValue);
+      default:
+        throw new RuntimeException("Unsupported type in the list: " + type);
+    }
+  }
+
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  private WritableColumnVector fillColumnVector(int total, List valueList) {
+    switch (logicalType.getTypeRoot()) {
+      case CHAR:
+      case VARCHAR:
+      case BINARY:
+      case VARBINARY:
+        HeapBytesVector heapBytesVector = new HeapBytesVector(total);
+        for (int i = 0; i < valueList.size(); i++) {
+          byte[] src = ((List<byte[]>) valueList).get(i);
+          if (src == null) {
+            heapBytesVector.setNullAt(i);
+          } else {
+            heapBytesVector.appendBytes(i, src, 0, src.length);
+          }
+        }
+        return heapBytesVector;
+      case BOOLEAN:
+        HeapBooleanVector heapBooleanVector = new HeapBooleanVector(total);
+        for (int i = 0; i < valueList.size(); i++) {
+          if (valueList.get(i) == null) {
+            heapBooleanVector.setNullAt(i);
+          } else {
+            heapBooleanVector.vector[i] = ((List<Boolean>) valueList).get(i);
+          }
+        }
+        return heapBooleanVector;
+      case TINYINT:
+        HeapByteVector heapByteVector = new HeapByteVector(total);
+        for (int i = 0; i < valueList.size(); i++) {
+          if (valueList.get(i) == null) {
+            heapByteVector.setNullAt(i);
+          } else {
+            heapByteVector.vector[i] = (byte) ((List<Integer>) 
valueList).get(i).intValue();
+          }
+        }
+        return heapByteVector;
+      case SMALLINT:
+        HeapShortVector heapShortVector = new HeapShortVector(total);
+        for (int i = 0; i < valueList.size(); i++) {
+          if (valueList.get(i) == null) {
+            heapShortVector.setNullAt(i);
+          } else {
+            heapShortVector.vector[i] = (short) ((List<Integer>) 
valueList).get(i).intValue();
+          }
+        }
+        return heapShortVector;
+      case INTEGER:
+      case DATE:
+      case TIME_WITHOUT_TIME_ZONE:
+        HeapIntVector heapIntVector = new HeapIntVector(total);
+        for (int i = 0; i < valueList.size(); i++) {
+          if (valueList.get(i) == null) {
+            heapIntVector.setNullAt(i);
+          } else {
+            heapIntVector.vector[i] = ((List<Integer>) valueList).get(i);
+          }
+        }
+        return heapIntVector;
+      case FLOAT:
+        HeapFloatVector heapFloatVector = new HeapFloatVector(total);
+        for (int i = 0; i < valueList.size(); i++) {
+          if (valueList.get(i) == null) {
+            heapFloatVector.setNullAt(i);
+          } else {
+            heapFloatVector.vector[i] = ((List<Float>) valueList).get(i);
+          }
+        }
+        return heapFloatVector;
+      case BIGINT:
+        HeapLongVector heapLongVector = new HeapLongVector(total);
+        for (int i = 0; i < valueList.size(); i++) {
+          if (valueList.get(i) == null) {
+            heapLongVector.setNullAt(i);
+          } else {
+            heapLongVector.vector[i] = ((List<Long>) valueList).get(i);
+          }
+        }
+        return heapLongVector;
+      case DOUBLE:
+        HeapDoubleVector heapDoubleVector = new HeapDoubleVector(total);
+        for (int i = 0; i < valueList.size(); i++) {
+          if (valueList.get(i) == null) {
+            heapDoubleVector.setNullAt(i);
+          } else {
+            heapDoubleVector.vector[i] = ((List<Double>) valueList).get(i);
+          }
+        }
+        return heapDoubleVector;
+      case TIMESTAMP_WITHOUT_TIME_ZONE:
+      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+        HeapTimestampVector heapTimestampVector = new 
HeapTimestampVector(total);
+        for (int i = 0; i < valueList.size(); i++) {
+          if (valueList.get(i) == null) {
+            heapTimestampVector.setNullAt(i);
+          } else {
+            heapTimestampVector.setTimestamp(i, ((List<TimestampData>) 
valueList).get(i));
+          }
+        }
+        return heapTimestampVector;
+      case DECIMAL:
+        PrimitiveType.PrimitiveTypeName primitiveTypeName =
+            descriptor.getPrimitiveType().getPrimitiveTypeName();
+        switch (primitiveTypeName) {
+          case INT32:
+            HeapIntVector phiv = new HeapIntVector(total);
+            for (int i = 0; i < valueList.size(); i++) {
+              if (valueList.get(i) == null) {
+                phiv.setNullAt(i);
+              } else {
+                phiv.vector[i] = ((List<Integer>) valueList).get(i);
+              }
+            }
+            return new ParquetDecimalVector(phiv);
+          case INT64:
+            HeapLongVector phlv = new HeapLongVector(total);
+            for (int i = 0; i < valueList.size(); i++) {
+              if (valueList.get(i) == null) {
+                phlv.setNullAt(i);
+              } else {
+                phlv.vector[i] = ((List<Long>) valueList).get(i);
+              }
+            }
+            return new ParquetDecimalVector(phlv);
+          default:
+            HeapBytesVector phbv = getHeapBytesVector(total, valueList);
+            return new ParquetDecimalVector(phbv);
+        }
+      default:
+        throw new RuntimeException("Unsupported type in the list: " + type);
+    }
+  }
+
+  @SuppressWarnings({"unchecked", "rawtypes"})
+  private static HeapBytesVector getHeapBytesVector(int total, List valueList) 
{
+    HeapBytesVector phbv = new HeapBytesVector(total);
+    for (int i = 0; i < valueList.size(); i++) {
+      byte[] src = ((List<byte[]>) valueList).get(i);
+      if (valueList.get(i) == null) {
+        phbv.setNullAt(i);
+      } else {
+        phbv.appendBytes(i, src, 0, src.length);
+      }
+    }
+    return phbv;
+  }
+
+  protected void readPage() {
+    DataPage page = pageReader.readPage();
+
+    if (page == null) {
+      return;
+    }
+
+    page.accept(
+        new DataPage.Visitor<Void>() {
+          @Override
+          public Void visit(DataPageV1 dataPageV1) {
+            readPageV1(dataPageV1);
+            return null;
+          }
+
+          @Override
+          public Void visit(DataPageV2 dataPageV2) {
+            readPageV2(dataPageV2);
+            return null;
+          }
+        });
+  }
+
+  private void initDataReader(Encoding dataEncoding, ByteBufferInputStream in, 
int valueCount)
+      throws IOException {
+    this.pageValueCount = valueCount;
+    this.endOfPageValueCount = valuesRead + pageValueCount;
+    if (dataEncoding.usesDictionary()) {
+      this.dataColumn = null;
+      if (dictionary == null) {
+        throw new IOException(
+            String.format(
+                "Could not read page in col %s because the dictionary was 
missing for encoding %s.",
+                descriptor, dataEncoding));
+      }
+      dataColumn =
+          ParquetDataColumnReaderFactory.getDataColumnReaderByType(
+              type.asPrimitiveType(),
+              dataEncoding.getDictionaryBasedValuesReader(
+                  descriptor, VALUES, dictionary.getDictionary()),
+              isUtcTimestamp);
+      this.isCurrentPageDictionaryEncoded = true;
+    } else {
+      dataColumn =
+          ParquetDataColumnReaderFactory.getDataColumnReaderByType(
+              type.asPrimitiveType(),
+              dataEncoding.getValuesReader(descriptor, VALUES),
+              isUtcTimestamp);
+      this.isCurrentPageDictionaryEncoded = false;
+    }
+
+    try {
+      dataColumn.initFromPage(pageValueCount, in);
+    } catch (IOException e) {
+      throw new IOException(String.format("Could not read page in col %s.", 
descriptor), e);
+    }
+  }
+
+  private void readPageV1(DataPageV1 page) {
+    ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, 
REPETITION_LEVEL);
+    ValuesReader dlReader = page.getDlEncoding().getValuesReader(descriptor, 
DEFINITION_LEVEL);
+    this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
+    this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
+    try {
+      BytesInput bytes = page.getBytes();
+      LOG.debug("Page size {}  bytes and {} records.", bytes.size(), 
pageValueCount);
+      ByteBufferInputStream in = bytes.toInputStream();
+      LOG.debug("Reading repetition levels at {}.", in.position());
+      rlReader.initFromPage(pageValueCount, in);
+      LOG.debug("Reading definition levels at {}.", in.position());
+      dlReader.initFromPage(pageValueCount, in);
+      LOG.debug("Reading data at {}.", in.position());
+      initDataReader(page.getValueEncoding(), in, page.getValueCount());
+    } catch (IOException e) {
+      throw new ParquetDecodingException(
+          String.format("Could not read page %s in col %s.", page, 
descriptor), e);
+    }
+  }
+
+  private void readPageV2(DataPageV2 page) {
+    this.pageValueCount = page.getValueCount();
+    this.repetitionLevelColumn =
+        newRLEIterator(descriptor.getMaxRepetitionLevel(), 
page.getRepetitionLevels());
+    this.definitionLevelColumn =
+        newRLEIterator(descriptor.getMaxDefinitionLevel(), 
page.getDefinitionLevels());
+    try {
+      LOG.debug(
+          "Page data size {} bytes and {} records.", page.getData().size(), 
pageValueCount);
+      initDataReader(
+          page.getDataEncoding(), page.getData().toInputStream(), 
page.getValueCount());
+    } catch (IOException e) {
+      throw new ParquetDecodingException(
+          String.format("Could not read page %s in col %s.", page, 
descriptor), e);
+    }
+  }
+
+  private IntIterator newRLEIterator(int maxLevel, BytesInput bytes) {
+    try {
+      if (maxLevel == 0) {
+        return new NullIntIterator();
+      }
+      return new RLEIntIterator(
+          new RunLengthBitPackingHybridDecoder(
+              BytesUtils.getWidthFromMaxInt(maxLevel),
+              new ByteArrayInputStream(bytes.toByteArray())));
+    } catch (IOException e) {
+      throw new ParquetDecodingException(
+          String.format("Could not read levels in page for col %s.", 
descriptor), e);
+    }
+  }
+
+  /** Utility interface to abstract over different way to read ints with 
different encodings. */
+  interface IntIterator {
+    int nextInt();
+  }
+
+  /** Reading int from {@link ValuesReader}. */
+  protected static final class ValuesReaderIntIterator implements IntIterator {
+    ValuesReader delegate;
+
+    public ValuesReaderIntIterator(ValuesReader delegate) {
+      this.delegate = delegate;
+    }
+
+    @Override
+    public int nextInt() {
+      return delegate.readInteger();
+    }
+  }
+
+  /** Reading int from {@link RunLengthBitPackingHybridDecoder}. */
+  protected static final class RLEIntIterator implements IntIterator {
+    RunLengthBitPackingHybridDecoder delegate;
+
+    public RLEIntIterator(RunLengthBitPackingHybridDecoder delegate) {
+      this.delegate = delegate;
+    }
+
+    @Override
+    public int nextInt() {
+      try {
+        return delegate.readInt();
+      } catch (IOException e) {
+        throw new ParquetDecodingException(e);
+      }
+    }
+  }
+
+  /** Reading zero always. */
+  protected static final class NullIntIterator implements IntIterator {
+    @Override
+    public int nextInt() {
+      return 0;
+    }
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetDataColumnReaderFactory.java
 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetDataColumnReaderFactory.java
index fdfe5d6fa3a3..3748e53fc284 100644
--- 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetDataColumnReaderFactory.java
+++ 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ParquetDataColumnReaderFactory.java
@@ -26,12 +26,16 @@ import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.column.Dictionary;
 import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.sql.Timestamp;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
 
 import static 
org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader.JULIAN_EPOCH_OFFSET_DAYS;
 import static 
org.apache.flink.formats.parquet.vector.reader.TimestampColumnReader.MILLIS_IN_DAY;
@@ -252,21 +256,115 @@ public final class ParquetDataColumnReaderFactory {
     }
   }
 
+  /**
+   * Reader for Parquet INT64 timestamp values (MILLIS / MICROS / NANOS), i.e. 
the standard
+   * timestamp encoding defined by Parquet's
+   * {@link LogicalTypeAnnotation.TimestampLogicalTypeAnnotation} and the 
legacy
+   * {@link OriginalType#TIMESTAMP_MILLIS} / {@link 
OriginalType#TIMESTAMP_MICROS} annotations.
+   * (The older INT96 encoding is marked deprecated by the Parquet format spec 
— see
+   * <a 
href="https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#timestamp";>
+   * LogicalTypes.md</a> — but is still supported here via {@link 
TypesFromInt96PageReader} for
+   * backwards compatibility with files written by older Hive / Spark / Impala 
versions.)
+   *
+   * <p>Used by {@link NestedPrimitiveColumnReader} when a TIMESTAMP column 
sits inside a
+   * {@code Row}, {@code Array} or {@code Map}; the top-level path continues 
to use
+   * {@link Int64TimestampColumnReader} for batched-vector efficiency.
+   */
+  public static class TypesFromInt64PageReader extends 
DefaultParquetDataColumnReader {
+    private final boolean isUtcTimestamp;
+    private final ChronoUnit chronoUnit;
+
+    public TypesFromInt64PageReader(
+        ValuesReader realReader, boolean isUtcTimestamp, ChronoUnit 
chronoUnit) {
+      super(realReader);
+      this.isUtcTimestamp = isUtcTimestamp;
+      this.chronoUnit = chronoUnit;
+    }
+
+    public TypesFromInt64PageReader(
+        Dictionary dict, boolean isUtcTimestamp, ChronoUnit chronoUnit) {
+      super(dict);
+      this.isUtcTimestamp = isUtcTimestamp;
+      this.chronoUnit = chronoUnit;
+    }
+
+    @Override
+    public TimestampData readTimestamp() {
+      return int64ToTimestamp(isUtcTimestamp, valuesReader.readLong(), 
chronoUnit);
+    }
+
+    @Override
+    public TimestampData readTimestamp(int id) {
+      return int64ToTimestamp(isUtcTimestamp, dict.decodeToLong(id), 
chronoUnit);
+    }
+  }
+
   private static ParquetDataColumnReader getDataColumnReaderByTypeHelper(
       boolean isDictionary,
       PrimitiveType parquetType,
       Dictionary dictionary,
       ValuesReader valuesReader,
       boolean isUtcTimestamp) {
-    if (parquetType.getPrimitiveTypeName() == 
PrimitiveType.PrimitiveTypeName.INT96) {
+    PrimitiveType.PrimitiveTypeName typeName = 
parquetType.getPrimitiveTypeName();
+    if (typeName == PrimitiveType.PrimitiveTypeName.INT96) {
       return isDictionary
           ? new TypesFromInt96PageReader(dictionary, isUtcTimestamp)
           : new TypesFromInt96PageReader(valuesReader, isUtcTimestamp);
-    } else {
-      return isDictionary
-          ? new DefaultParquetDataColumnReader(dictionary)
-          : new DefaultParquetDataColumnReader(valuesReader);
     }
+    if (typeName == PrimitiveType.PrimitiveTypeName.INT64) {
+      ChronoUnit unit = resolveInt64TimestampUnit(parquetType);
+      if (unit != null) {
+        return isDictionary
+            ? new TypesFromInt64PageReader(dictionary, isUtcTimestamp, unit)
+            : new TypesFromInt64PageReader(valuesReader, isUtcTimestamp, unit);
+      }
+    }
+    return isDictionary
+        ? new DefaultParquetDataColumnReader(dictionary)
+        : new DefaultParquetDataColumnReader(valuesReader);
+  }
+
+  /**
+   * Returns the {@link ChronoUnit} for a Parquet INT64 TIMESTAMP column, or 
{@code null} if the
+   * column is a plain INT64 (not a timestamp).
+   *
+   * <p>Supports both the modern {@link 
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation} and
+   * the legacy {@link OriginalType#TIMESTAMP_MILLIS} / {@link 
OriginalType#TIMESTAMP_MICROS}
+   * encodings.
+   */
+  private static ChronoUnit resolveInt64TimestampUnit(PrimitiveType 
parquetType) {
+    LogicalTypeAnnotation annotation = parquetType.getLogicalTypeAnnotation();
+    if (annotation instanceof 
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) {
+      LogicalTypeAnnotation.TimeUnit unit =
+          ((LogicalTypeAnnotation.TimestampLogicalTypeAnnotation) 
annotation).getUnit();
+      switch (unit) {
+        case MILLIS:
+          return ChronoUnit.MILLIS;
+        case MICROS:
+          return ChronoUnit.MICROS;
+        case NANOS:
+          return ChronoUnit.NANOS;
+        default:
+          return null;
+      }
+    }
+    OriginalType originalType = parquetType.getOriginalType();
+    if (originalType == OriginalType.TIMESTAMP_MILLIS) {
+      return ChronoUnit.MILLIS;
+    }
+    if (originalType == OriginalType.TIMESTAMP_MICROS) {
+      return ChronoUnit.MICROS;
+    }
+    return null;
+  }
+
+  private static TimestampData int64ToTimestamp(
+      boolean utcTimestamp, long value, ChronoUnit unit) {
+    Instant instant = Instant.EPOCH.plus(value, unit);
+    if (utcTimestamp) {
+      return TimestampData.fromInstant(instant);
+    }
+    return TimestampData.fromTimestamp(Timestamp.from(instant));
   }
 
   public static ParquetDataColumnReader getDataColumnReaderByTypeOnDictionary(
diff --git 
a/hudi-flink-datasource/hudi-flink1.18.x/src/test/java/org/apache/hudi/table/format/cow/vector/TestHeapColumnVectorAccessors.java
 
b/hudi-flink-datasource/hudi-flink1.18.x/src/test/java/org/apache/hudi/table/format/cow/vector/TestHeapColumnVectorAccessors.java
new file mode 100644
index 000000000000..4b48fdd37460
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink1.18.x/src/test/java/org/apache/hudi/table/format/cow/vector/TestHeapColumnVectorAccessors.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector;
+
+import org.apache.flink.table.data.columnar.vector.heap.HeapIntVector;
+import org.apache.flink.table.data.columnar.vector.heap.HeapLongVector;
+import 
org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertSame;
+
+/**
+ * Tests for the Flink 2.1-compatible accessors added on {@link 
HeapArrayVector},
+ * {@link HeapMapColumnVector} and {@link HeapRowColumnVector} when vendoring 
Flink 2.1's
+ * nested-Parquet reader (FLINK-35702).
+ *
+ * <p>The accessors are wrappers over the existing public fields so legacy 
callers continue to
+ * work. These tests exist solely to pin down that wrapper contract — runtime 
correctness of the
+ * Dremel-style read path is exercised end-to-end by integration tests once
+ * {@code ParquetSplitReaderUtil} is wired up to {@code NestedColumnReader} in 
a follow-up PR.
+ */
+class TestHeapColumnVectorAccessors {
+
+  // 
-----------------------------------------------------------------------------------------------
+  // HeapArrayVector
+  // 
-----------------------------------------------------------------------------------------------
+
+  @Test
+  void heapArrayVectorAccessorsReflectPublicFields() {
+    HeapIntVector child = new HeapIntVector(4);
+    HeapArrayVector vector = new HeapArrayVector(2, child);
+
+    long[] offsets = {0L, 2L};
+    long[] lengths = {2L, 2L};
+    HeapLongVector replacementChild = new HeapLongVector(4);
+
+    vector.setOffsets(offsets);
+    vector.setLengths(lengths);
+    vector.setChild(replacementChild);
+    vector.setSize(2);
+
+    assertArrayEquals(offsets, vector.getOffsets());
+    assertArrayEquals(lengths, vector.getLengths());
+    assertSame(replacementChild, vector.getChild());
+    assertEquals(2, vector.getSize());
+
+    // Backing public fields are kept in sync — preserves backward 
compatibility.
+    assertSame(offsets, vector.offsets);
+    assertSame(lengths, vector.lengths);
+    assertSame(replacementChild, vector.child);
+  }
+
+  // 
-----------------------------------------------------------------------------------------------
+  // HeapMapColumnVector
+  // 
-----------------------------------------------------------------------------------------------
+
+  @Test
+  void heapMapColumnVectorConstructorInitializesOffsetsAndLengths() {
+    HeapIntVector keys = new HeapIntVector(4);
+    HeapIntVector values = new HeapIntVector(4);
+
+    HeapMapColumnVector vector = new HeapMapColumnVector(3, keys, values);
+
+    assertEquals(3, vector.getOffsets().length);
+    assertEquals(3, vector.getLengths().length);
+  }
+
+  @Test
+  void heapMapColumnVectorAccessorsReflectInternalState() {
+    HeapIntVector keys = new HeapIntVector(4);
+    HeapIntVector values = new HeapIntVector(4);
+    HeapMapColumnVector vector = new HeapMapColumnVector(2, keys, values);
+
+    long[] offsets = {0L, 2L};
+    long[] lengths = {2L, 2L};
+    HeapLongVector newKeys = new HeapLongVector(4);
+    HeapLongVector newValues = new HeapLongVector(4);
+
+    vector.setOffsets(offsets);
+    vector.setLengths(lengths);
+    vector.setKeys(newKeys);
+    vector.setValues(newValues);
+    vector.setSize(2);
+
+    assertArrayEquals(offsets, vector.getOffsets());
+    assertArrayEquals(lengths, vector.getLengths());
+    assertSame(newKeys, vector.getKeys());
+    assertSame(newValues, vector.getValues());
+    // The Flink-2.1-style ColumnVector accessors return the same underlying 
child.
+    assertSame(newKeys, vector.getKeyColumnVector());
+    assertSame(newValues, vector.getValueColumnVector());
+    assertEquals(2, vector.getSize());
+  }
+
+  // 
-----------------------------------------------------------------------------------------------
+  // HeapRowColumnVector
+  // 
-----------------------------------------------------------------------------------------------
+
+  @Test
+  void heapRowColumnVectorFieldsAccessorsReflectPublicVectors() {
+    HeapIntVector intField = new HeapIntVector(2);
+    HeapLongVector longField = new HeapLongVector(2);
+    HeapRowColumnVector vector = new HeapRowColumnVector(2, intField, 
longField);
+
+    WritableColumnVector[] originalFields = vector.getFields();
+    assertEquals(2, originalFields.length);
+    assertSame(intField, originalFields[0]);
+    assertSame(longField, originalFields[1]);
+    // Backing public field is kept in sync — preserves backward compatibility.
+    assertSame(originalFields, vector.vectors);
+
+    HeapIntVector replacement = new HeapIntVector(2);
+    WritableColumnVector[] replacementFields = {replacement, longField};
+    vector.setFields(replacementFields);
+
+    assertSame(replacementFields, vector.getFields());
+    assertSame(replacementFields, vector.vectors);
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink1.18.x/src/test/java/org/apache/hudi/table/format/cow/vector/reader/TestParquetDataColumnReaderFactory.java
 
b/hudi-flink-datasource/hudi-flink1.18.x/src/test/java/org/apache/hudi/table/format/cow/vector/reader/TestParquetDataColumnReaderFactory.java
new file mode 100644
index 000000000000..9d6607d03feb
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink1.18.x/src/test/java/org/apache/hudi/table/format/cow/vector/reader/TestParquetDataColumnReaderFactory.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.format.cow.vector.reader;
+
+import org.apache.flink.table.data.TimestampData;
+
+import org.apache.parquet.column.Dictionary;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.OriginalType;
+import org.apache.parquet.schema.PrimitiveType;
+import org.apache.parquet.schema.Types;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/**
+ * Tests for the {@link ParquetDataColumnReaderFactory} INT64 timestamp 
dispatch added when
+ * vendoring Flink 2.1's nested-Parquet reader (FLINK-35702).
+ *
+ * <p>The factory is exercised end-to-end by integration tests through
+ * {@link NestedPrimitiveColumnReader}; this unit test focuses on the small, 
deterministic piece
+ * that was added by this PR — selecting the right {@code 
ParquetDataColumnReader} for each
+ * supported INT64 TIMESTAMP encoding (modern {@link 
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation}
+ * MILLIS / MICROS / NANOS plus the legacy {@link OriginalType} encodings) and 
decoding values
+ * using both the values-reader and dictionary code paths.
+ */
+class TestParquetDataColumnReaderFactory {
+
+  // 
-----------------------------------------------------------------------------------------------
+  // Type dispatch
+  // 
-----------------------------------------------------------------------------------------------
+
+  @Test
+  void valuesReaderDispatchInt96TimestampUsesInt96Reader() {
+    PrimitiveType type = 
Types.required(PrimitiveType.PrimitiveTypeName.INT96).named("ts");
+    ParquetDataColumnReader reader =
+        ParquetDataColumnReaderFactory.getDataColumnReaderByType(type, new 
StubValuesReader(), true);
+    
assertInstanceOf(ParquetDataColumnReaderFactory.TypesFromInt96PageReader.class, 
reader);
+  }
+
+  @Test
+  void valuesReaderDispatchInt64WithoutAnnotationUsesDefaultReader() {
+    PrimitiveType type = 
Types.required(PrimitiveType.PrimitiveTypeName.INT64).named("plainLong");
+    ParquetDataColumnReader reader =
+        ParquetDataColumnReaderFactory.getDataColumnReaderByType(type, new 
StubValuesReader(), true);
+    
assertInstanceOf(ParquetDataColumnReaderFactory.DefaultParquetDataColumnReader.class,
 reader);
+  }
+
+  @Test
+  void valuesReaderDispatchInt64TimestampMillisLogicalUsesInt64Reader() {
+    PrimitiveType type =
+        Types.required(PrimitiveType.PrimitiveTypeName.INT64)
+            .as(LogicalTypeAnnotation.timestampType(true, 
LogicalTypeAnnotation.TimeUnit.MILLIS))
+            .named("ts");
+    ParquetDataColumnReader reader =
+        ParquetDataColumnReaderFactory.getDataColumnReaderByType(type, new 
StubValuesReader(), true);
+    
assertInstanceOf(ParquetDataColumnReaderFactory.TypesFromInt64PageReader.class, 
reader);
+  }
+
+  @Test
+  void valuesReaderDispatchInt64TimestampMicrosLogicalUsesInt64Reader() {
+    PrimitiveType type =
+        Types.required(PrimitiveType.PrimitiveTypeName.INT64)
+            .as(LogicalTypeAnnotation.timestampType(false, 
LogicalTypeAnnotation.TimeUnit.MICROS))
+            .named("ts");
+    ParquetDataColumnReader reader =
+        ParquetDataColumnReaderFactory.getDataColumnReaderByType(type, new 
StubValuesReader(), true);
+    
assertInstanceOf(ParquetDataColumnReaderFactory.TypesFromInt64PageReader.class, 
reader);
+  }
+
+  @Test
+  void valuesReaderDispatchInt64TimestampNanosLogicalUsesInt64Reader() {
+    PrimitiveType type =
+        Types.required(PrimitiveType.PrimitiveTypeName.INT64)
+            .as(LogicalTypeAnnotation.timestampType(true, 
LogicalTypeAnnotation.TimeUnit.NANOS))
+            .named("ts");
+    ParquetDataColumnReader reader =
+        ParquetDataColumnReaderFactory.getDataColumnReaderByType(type, new 
StubValuesReader(), true);
+    
assertInstanceOf(ParquetDataColumnReaderFactory.TypesFromInt64PageReader.class, 
reader);
+  }
+
+  @Test
+  void valuesReaderDispatchInt64LegacyTimestampMillisOriginalUsesInt64Reader() 
{
+    PrimitiveType type =
+        Types.required(PrimitiveType.PrimitiveTypeName.INT64)
+            .as(OriginalType.TIMESTAMP_MILLIS)
+            .named("ts");
+    ParquetDataColumnReader reader =
+        ParquetDataColumnReaderFactory.getDataColumnReaderByType(type, new 
StubValuesReader(), true);
+    
assertInstanceOf(ParquetDataColumnReaderFactory.TypesFromInt64PageReader.class, 
reader);
+  }
+
+  @Test
+  void valuesReaderDispatchInt64LegacyTimestampMicrosOriginalUsesInt64Reader() 
{
+    PrimitiveType type =
+        Types.required(PrimitiveType.PrimitiveTypeName.INT64)
+            .as(OriginalType.TIMESTAMP_MICROS)
+            .named("ts");
+    ParquetDataColumnReader reader =
+        ParquetDataColumnReaderFactory.getDataColumnReaderByType(type, new 
StubValuesReader(), true);
+    
assertInstanceOf(ParquetDataColumnReaderFactory.TypesFromInt64PageReader.class, 
reader);
+  }
+
+  @Test
+  void valuesReaderDispatchInt32DoesNotUseTimestampReader() {
+    PrimitiveType type = 
Types.required(PrimitiveType.PrimitiveTypeName.INT32).named("i");
+    ParquetDataColumnReader reader =
+        ParquetDataColumnReaderFactory.getDataColumnReaderByType(type, new 
StubValuesReader(), true);
+    
assertInstanceOf(ParquetDataColumnReaderFactory.DefaultParquetDataColumnReader.class,
 reader);
+  }
+
+  @Test
+  void dictionaryReaderDispatchInt64TimestampMillisLogicalUsesInt64Reader() {
+    PrimitiveType type =
+        Types.required(PrimitiveType.PrimitiveTypeName.INT64)
+            .as(LogicalTypeAnnotation.timestampType(true, 
LogicalTypeAnnotation.TimeUnit.MILLIS))
+            .named("ts");
+    ParquetDataColumnReader reader =
+        ParquetDataColumnReaderFactory.getDataColumnReaderByTypeOnDictionary(
+            type, new StubDictionary(), true);
+    
assertInstanceOf(ParquetDataColumnReaderFactory.TypesFromInt64PageReader.class, 
reader);
+  }
+
+  // 
-----------------------------------------------------------------------------------------------
+  // INT64 → TimestampData decoding (per ChronoUnit, both UTC and 
local-time-zone branches)
+  // 
-----------------------------------------------------------------------------------------------
+
+  @Test
+  void int64ReaderReadsTimestampMillisFromValuesReaderInUtc() {
+    PrimitiveType type =
+        Types.required(PrimitiveType.PrimitiveTypeName.INT64)
+            .as(LogicalTypeAnnotation.timestampType(true, 
LogicalTypeAnnotation.TimeUnit.MILLIS))
+            .named("ts");
+    long epochMillis = 1_700_000_000_123L;
+    ParquetDataColumnReader reader =
+        ParquetDataColumnReaderFactory.getDataColumnReaderByType(
+            type, new StubValuesReader(epochMillis), true);
+
+    TimestampData ts = reader.readTimestamp();
+    assertNotNull(ts);
+    assertEquals(epochMillis, ts.getMillisecond());
+    assertEquals(0, ts.getNanoOfMillisecond());
+  }
+
+  @Test
+  void int64ReaderReadsTimestampMicrosFromValuesReaderInUtc() {
+    PrimitiveType type =
+        Types.required(PrimitiveType.PrimitiveTypeName.INT64)
+            .as(LogicalTypeAnnotation.timestampType(true, 
LogicalTypeAnnotation.TimeUnit.MICROS))
+            .named("ts");
+    long epochMicros = 1_700_000_000_123_456L;
+    ParquetDataColumnReader reader =
+        ParquetDataColumnReaderFactory.getDataColumnReaderByType(
+            type, new StubValuesReader(epochMicros), true);
+
+    TimestampData ts = reader.readTimestamp();
+    assertNotNull(ts);
+    assertEquals(epochMicros / 1_000L, ts.getMillisecond());
+    // 456 microseconds remain → 456_000 nanoseconds within the millisecond
+    assertEquals(456_000, ts.getNanoOfMillisecond());
+  }
+
+  @Test
+  void int64ReaderReadsTimestampNanosFromValuesReaderInUtc() {
+    PrimitiveType type =
+        Types.required(PrimitiveType.PrimitiveTypeName.INT64)
+            .as(LogicalTypeAnnotation.timestampType(true, 
LogicalTypeAnnotation.TimeUnit.NANOS))
+            .named("ts");
+    long epochNanos = 1_700_000_000_123_456_789L;
+    ParquetDataColumnReader reader =
+        ParquetDataColumnReaderFactory.getDataColumnReaderByType(
+            type, new StubValuesReader(epochNanos), true);
+
+    TimestampData ts = reader.readTimestamp();
+    assertNotNull(ts);
+    assertEquals(epochNanos / 1_000_000L, ts.getMillisecond());
+    assertEquals(456_789, ts.getNanoOfMillisecond());
+  }
+
+  @Test
+  void int64ReaderReadsTimestampMillisFromDictionaryInUtc() {
+    PrimitiveType type =
+        Types.required(PrimitiveType.PrimitiveTypeName.INT64)
+            .as(LogicalTypeAnnotation.timestampType(true, 
LogicalTypeAnnotation.TimeUnit.MILLIS))
+            .named("ts");
+    long epochMillis = 1_700_000_000_456L;
+    ParquetDataColumnReader reader =
+        ParquetDataColumnReaderFactory.getDataColumnReaderByTypeOnDictionary(
+            type, new StubDictionary(epochMillis), true);
+
+    TimestampData ts = reader.readTimestamp(0);
+    assertNotNull(ts);
+    assertEquals(epochMillis, ts.getMillisecond());
+  }
+
+  // 
-----------------------------------------------------------------------------------------------
+  // Stubs (only the methods exercised by the dispatch + decoding tests above)
+  // 
-----------------------------------------------------------------------------------------------
+
+  /** Minimal {@link ValuesReader} returning a fixed long; other methods 
throw. */
+  private static final class StubValuesReader extends ValuesReader {
+    private final long fixedLong;
+
+    StubValuesReader() {
+      this(0L);
+    }
+
+    StubValuesReader(long fixedLong) {
+      this.fixedLong = fixedLong;
+    }
+
+    @Override
+    public long readLong() {
+      return fixedLong;
+    }
+
+    @Override
+    public void skip() {
+      // unused
+    }
+  }
+
+  /** Minimal {@link Dictionary} returning a fixed long for any id; other 
methods throw. */
+  private static final class StubDictionary extends Dictionary {
+    private final long fixedLong;
+
+    StubDictionary() {
+      this(0L);
+    }
+
+    StubDictionary(long fixedLong) {
+      super(null);
+      this.fixedLong = fixedLong;
+    }
+
+    @Override
+    public Binary decodeToBinary(int id) {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public long decodeToLong(int id) {
+      return fixedLong;
+    }
+
+    @Override
+    public int getMaxId() {
+      return 0;
+    }
+  }
+}

Reply via email to