This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 004d15996896 refactor(flink): Remove legacy Parquet nested readers 
superseded by Flink 2.1 Dremel path (FLINK-35702) (#18701)
004d15996896 is described below

commit 004d159968964d17875adb0ba97a51a206dfe1ff
Author: Shihuan Liu <[email protected]>
AuthorDate: Sat May 9 05:16:58 2026 -0700

    refactor(flink): Remove legacy Parquet nested readers superseded by Flink 
2.1 Dremel path (FLINK-35702) (#18701)
    
    * refactor(flink): Remove legacy Parquet nested readers superseded by Flink 
2.1 Dremel path (FLINK-35702)
    * Fix flaky IT test
---
 .../apache/hudi/table/ITTestHoodieDataSource.java  |  42 +-
 .../format/cow/vector/ColumnarGroupArrayData.java  | 179 --------
 .../format/cow/vector/ColumnarGroupMapData.java    |  63 ---
 .../format/cow/vector/ColumnarGroupRowData.java    | 138 ------
 .../cow/vector/HeapArrayGroupColumnVector.java     |  53 ---
 .../cow/vector/reader/ArrayColumnReader.java       | 473 ---------------------
 .../format/cow/vector/reader/ArrayGroupReader.java |  44 --
 .../format/cow/vector/reader/MapColumnReader.java  |  56 ---
 .../cow/vector/reader/NestedColumnReader.java      |  15 +-
 .../vector/reader/NestedPrimitiveColumnReader.java |   4 +-
 .../format/cow/vector/reader/RowColumnReader.java  |  63 ---
 .../cow/vector/TestHeapColumnVectorAccessors.java  |   5 +-
 12 files changed, 53 insertions(+), 1082 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 37d5e9169ff3..e62f05f109fc 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -3550,11 +3550,51 @@ public class ITTestHoodieDataSource {
       // and max waiting timeout is 30s
       tableResult.await(30, TimeUnit.SECONDS);
     } catch (Throwable e) {
-      ExceptionUtils.assertThrowable(e, 
CollectSinkTableFactory.SuccessException.class);
+      // Acceptable terminal causes:
+      //   1. SuccessException: the sink reached its expected row count and 
intentionally
+      //      threw to terminate the streaming job. This is the happy path.
+      //   2. IOException("Stream is closed!") wrapped as HoodieIOException: a 
benign
+      //      error-attribution race between the source-side 
cascading-shutdown path and
+      //      the sink-side SuccessException terminator. When the sink throws
+      //      SuccessException to end the job, the chained source's 
SplitFetcher can close
+      //      the underlying Hadoop FSDataInputStream while the mailbox is 
still draining
+      //      a BatchRecords queued earlier; the next row-group read on the 
now-closed
+      //      stream surfaces an IOException("Stream is closed!"). With
+      //      restart-strategy.fixed-delay.attempts=0 (set in beforeEach to 
keep tests
+      //      deterministic) that IOException becomes the job's reported 
failure cause
+      //      instead of the sink's SuccessException, even though the sink has 
already
+      //      collected the expected rows by then - i.e. the functional 
outcome is
+      //      unchanged, only the error-attribution differs. Production paths 
correctly
+      //      fail the job on stream-closed-mid-read (the right behavior for 
real I/O
+      //      failures), so this tolerance is scoped to the 
SuccessException-based test
+      //      pattern below and is NOT mirrored in production code.
+      if (!isAcceptableTerminalFailure(e)) {
+        throw new AssertionError("Unexpected job failure", e);
+      }
     }
     tEnv.executeSql("DROP TABLE IF EXISTS sink");
     return CollectSinkTableFactory.RESULT.values().stream()
         .flatMap(Collection::stream)
         .collect(Collectors.toList());
   }
+
+  /**
+   * Whether {@code e} (or any of its causes) is one of the terminal failures 
that
+   * {@link #fetchResultWithExpectedNum} is allowed to swallow. See the 
comment at the call
+   * site for the rationale.
+   */
+  private static boolean isAcceptableTerminalFailure(Throwable e) {
+    Throwable cur = e;
+    while (cur != null) {
+      if (cur instanceof CollectSinkTableFactory.SuccessException) {
+        return true;
+      }
+      String msg = cur.getMessage();
+      if (msg != null && msg.contains("Stream is closed")) {
+        return true;
+      }
+      cur = cur.getCause();
+    }
+    return false;
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/ColumnarGroupArrayData.java
 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/ColumnarGroupArrayData.java
deleted file mode 100644
index 4c9275f3b093..000000000000
--- 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/ColumnarGroupArrayData.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.format.cow.vector;
-
-import org.apache.flink.table.data.ArrayData;
-import org.apache.flink.table.data.DecimalData;
-import org.apache.flink.table.data.MapData;
-import org.apache.flink.table.data.RawValueData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.data.TimestampData;
-import 
org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector;
-
-public class ColumnarGroupArrayData implements ArrayData {
-
-  WritableColumnVector vector;
-  int rowId;
-
-  public ColumnarGroupArrayData(WritableColumnVector vector, int rowId) {
-    this.vector = vector;
-    this.rowId = rowId;
-  }
-
-  @Override
-  public int size() {
-    if (vector == null) {
-      return 0;
-    }
-
-    if (vector instanceof HeapRowColumnVector) {
-      // assume all fields have the same size
-      if (((HeapRowColumnVector) vector).vectors == null || 
((HeapRowColumnVector) vector).vectors.length == 0) {
-        return 0;
-      }
-      return ((HeapArrayVector) ((HeapRowColumnVector) 
vector).vectors[0]).getArray(rowId).size();
-    }
-    throw new UnsupportedOperationException(vector.getClass().getName() + " is 
not supported. Supported vector types: HeapRowColumnVector");
-  }
-
-  @Override
-  public boolean isNullAt(int index) {
-    if (vector == null) {
-      return true;
-    }
-
-    if (vector instanceof HeapRowColumnVector) {
-      return ((HeapRowColumnVector) vector).vectors == null;
-    }
-
-    throw new UnsupportedOperationException(vector.getClass().getName() + " is 
not supported. Supported vector types: HeapRowColumnVector");
-  }
-
-  @Override
-  public boolean getBoolean(int index) {
-    throw new UnsupportedOperationException("Not support the operation!");
-  }
-
-  @Override
-  public byte getByte(int index) {
-    throw new UnsupportedOperationException("Not support the operation!");
-  }
-
-  @Override
-  public short getShort(int index) {
-    throw new UnsupportedOperationException("Not support the operation!");
-  }
-
-  @Override
-  public int getInt(int index) {
-    throw new UnsupportedOperationException("Not support the operation!");
-  }
-
-  @Override
-  public long getLong(int index) {
-    throw new UnsupportedOperationException("Not support the operation!");
-  }
-
-  @Override
-  public float getFloat(int index) {
-    throw new UnsupportedOperationException("Not support the operation!");
-  }
-
-  @Override
-  public double getDouble(int index) {
-    throw new UnsupportedOperationException("Not support the operation!");
-  }
-
-  @Override
-  public StringData getString(int index) {
-    throw new UnsupportedOperationException("Not support the operation!");
-  }
-
-  @Override
-  public DecimalData getDecimal(int index, int precision, int scale) {
-    throw new UnsupportedOperationException("Not support the operation!");
-  }
-
-  @Override
-  public TimestampData getTimestamp(int index, int precision) {
-    throw new UnsupportedOperationException("Not support the operation!");
-  }
-
-  @Override
-  public <T> RawValueData<T> getRawValue(int index) {
-    throw new UnsupportedOperationException("Not support the operation!");
-  }
-
-  @Override
-  public byte[] getBinary(int index) {
-    throw new UnsupportedOperationException("Not support the operation!");
-  }
-
-  @Override
-  public ArrayData getArray(int index) {
-    throw new UnsupportedOperationException("Not support the operation!");
-  }
-
-  @Override
-  public MapData getMap(int index) {
-    throw new UnsupportedOperationException("Not support the operation!");
-  }
-
-  @Override
-  public RowData getRow(int index, int numFields) {
-    return new ColumnarGroupRowData((HeapRowColumnVector) vector, rowId, 
index);
-  }
-
-  @Override
-  public boolean[] toBooleanArray() {
-    throw new UnsupportedOperationException("Not support the operation!");
-  }
-
-  @Override
-  public byte[] toByteArray() {
-    throw new UnsupportedOperationException("Not support the operation!");
-  }
-
-  @Override
-  public short[] toShortArray() {
-    throw new UnsupportedOperationException("Not support the operation!");
-  }
-
-  @Override
-  public int[] toIntArray() {
-    throw new UnsupportedOperationException("Not support the operation!");
-  }
-
-  @Override
-  public long[] toLongArray() {
-    throw new UnsupportedOperationException("Not support the operation!");
-  }
-
-  @Override
-  public float[] toFloatArray() {
-    throw new UnsupportedOperationException("Not support the operation!");
-  }
-
-  @Override
-  public double[] toDoubleArray() {
-    throw new UnsupportedOperationException("Not support the operation!");
-  }
-
-}
diff --git 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/ColumnarGroupMapData.java
 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/ColumnarGroupMapData.java
deleted file mode 100644
index 69cb6feca13e..000000000000
--- 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/ColumnarGroupMapData.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.format.cow.vector;
-
-import org.apache.flink.table.data.ArrayData;
-import org.apache.flink.table.data.MapData;
-import 
org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector;
-
-public class ColumnarGroupMapData implements MapData {
-
-  WritableColumnVector keyVector;
-  WritableColumnVector valueVector;
-  int rowId;
-
-  public ColumnarGroupMapData(WritableColumnVector keyVector, 
WritableColumnVector valueVector, int rowId) {
-    this.keyVector = keyVector;
-    this.valueVector = valueVector;
-    this.rowId = rowId;
-  }
-
-  @Override
-  public int size() {
-    if (keyVector == null) {
-      return 0;
-    }
-
-    if (keyVector instanceof HeapArrayVector) {
-      return ((HeapArrayVector) keyVector).getArray(rowId).size();
-    }
-    throw new UnsupportedOperationException(keyVector.getClass().getName() + " 
is not supported. Supported vector types: HeapArrayVector");
-  }
-
-  @Override
-  public ArrayData keyArray() {
-    return ((HeapArrayVector) keyVector).getArray(rowId);
-  }
-
-  @Override
-  public ArrayData valueArray() {
-    if (valueVector instanceof HeapArrayVector) {
-      return ((HeapArrayVector) valueVector).getArray(rowId);
-    } else if (valueVector instanceof HeapArrayGroupColumnVector) {
-      return ((HeapArrayGroupColumnVector) valueVector).getArray(rowId);
-    }
-    throw new UnsupportedOperationException(valueVector.getClass().getName() + 
" is not supported. Supported vector types: HeapArrayVector, 
HeapArrayGroupColumnVector");
-  }
-}
diff --git 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/ColumnarGroupRowData.java
 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/ColumnarGroupRowData.java
deleted file mode 100644
index 439c1880823f..000000000000
--- 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/ColumnarGroupRowData.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.format.cow.vector;
-
-import org.apache.flink.table.data.ArrayData;
-import org.apache.flink.table.data.DecimalData;
-import org.apache.flink.table.data.MapData;
-import org.apache.flink.table.data.RawValueData;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.data.StringData;
-import org.apache.flink.table.data.TimestampData;
-import org.apache.flink.types.RowKind;
-
-public class ColumnarGroupRowData implements RowData {
-
-  HeapRowColumnVector vector;
-  int rowId;
-  int index;
-
-  public ColumnarGroupRowData(HeapRowColumnVector vector, int rowId, int 
index) {
-    this.vector = vector;
-    this.rowId = rowId;
-    this.index = index;
-  }
-
-  @Override
-  public int getArity() {
-    return vector.vectors.length;
-  }
-
-  @Override
-  public RowKind getRowKind() {
-    return RowKind.INSERT;
-  }
-
-  @Override
-  public void setRowKind(RowKind rowKind) {
-    throw new UnsupportedOperationException("Not support the operation!");
-  }
-
-  @Override
-  public boolean isNullAt(int pos) {
-    return
-        vector.vectors[pos].isNullAt(rowId)
-            || ((HeapArrayVector) 
(vector.vectors[pos])).getArray(rowId).isNullAt(index);
-  }
-
-  @Override
-  public boolean getBoolean(int pos) {
-    return ((HeapArrayVector) 
(vector.vectors[pos])).getArray(rowId).getBoolean(index);
-  }
-
-  @Override
-  public byte getByte(int pos) {
-    return ((HeapArrayVector) 
(vector.vectors[pos])).getArray(rowId).getByte(index);
-  }
-
-  @Override
-  public short getShort(int pos) {
-    return ((HeapArrayVector) 
(vector.vectors[pos])).getArray(rowId).getShort(index);
-  }
-
-  @Override
-  public int getInt(int pos) {
-    return ((HeapArrayVector) 
(vector.vectors[pos])).getArray(rowId).getInt(index);
-  }
-
-  @Override
-  public long getLong(int pos) {
-    return ((HeapArrayVector) 
(vector.vectors[pos])).getArray(rowId).getLong(index);
-  }
-
-  @Override
-  public float getFloat(int pos) {
-    return ((HeapArrayVector) 
(vector.vectors[pos])).getArray(rowId).getFloat(index);
-  }
-
-  @Override
-  public double getDouble(int pos) {
-    return ((HeapArrayVector) 
(vector.vectors[pos])).getArray(rowId).getDouble(index);
-  }
-
-  @Override
-  public StringData getString(int pos) {
-    return ((HeapArrayVector) 
(vector.vectors[pos])).getArray(rowId).getString(index);
-  }
-
-  @Override
-  public DecimalData getDecimal(int pos, int i1, int i2) {
-    return ((HeapArrayVector) 
(vector.vectors[pos])).getArray(rowId).getDecimal(index, i1, i2);
-  }
-
-  @Override
-  public TimestampData getTimestamp(int pos, int i1) {
-    return ((HeapArrayVector) 
(vector.vectors[pos])).getArray(rowId).getTimestamp(index, i1);
-  }
-
-  @Override
-  public <T> RawValueData<T> getRawValue(int pos) {
-    return ((HeapArrayVector) 
(vector.vectors[pos])).getArray(rowId).getRawValue(index);
-  }
-
-  @Override
-  public byte[] getBinary(int pos) {
-    return ((HeapArrayVector) 
(vector.vectors[pos])).getArray(rowId).getBinary(index);
-  }
-
-  @Override
-  public ArrayData getArray(int pos) {
-    return ((HeapArrayVector) 
(vector.vectors[pos])).getArray(rowId).getArray(index);
-  }
-
-  @Override
-  public MapData getMap(int pos) {
-    return ((HeapArrayVector) 
(vector.vectors[pos])).getArray(rowId).getMap(index);
-  }
-
-  @Override
-  public RowData getRow(int pos, int numFields) {
-    return ((HeapArrayVector) 
(vector.vectors[pos])).getArray(rowId).getRow(index, numFields);
-  }
-}
diff --git 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapArrayGroupColumnVector.java
 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapArrayGroupColumnVector.java
deleted file mode 100644
index 3d7d8b1f0de0..000000000000
--- 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/HeapArrayGroupColumnVector.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.format.cow.vector;
-
-import org.apache.flink.table.data.ArrayData;
-import org.apache.flink.table.data.columnar.vector.ArrayColumnVector;
-import org.apache.flink.table.data.columnar.vector.heap.AbstractHeapVector;
-import 
org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector;
-
-/**
- * This class represents a nullable heap row column vector.
- */
-public class HeapArrayGroupColumnVector extends AbstractHeapVector
-    implements WritableColumnVector, ArrayColumnVector {
-
-  public WritableColumnVector vector;
-
-  public HeapArrayGroupColumnVector(int len) {
-    super(len);
-  }
-
-  public HeapArrayGroupColumnVector(int len, WritableColumnVector vector) {
-    super(len);
-    this.vector = vector;
-  }
-
-  @Override
-  public ArrayData getArray(int rowId) {
-    return new ColumnarGroupArrayData(vector, rowId);
-  }
-
-  @Override
-  public void reset() {
-    super.reset();
-    vector.reset();
-  }
-}
diff --git 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ArrayColumnReader.java
 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ArrayColumnReader.java
deleted file mode 100644
index d758f35078d8..000000000000
--- 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ArrayColumnReader.java
+++ /dev/null
@@ -1,473 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.format.cow.vector.reader;
-
-import org.apache.hudi.table.format.cow.vector.HeapArrayVector;
-import org.apache.hudi.table.format.cow.vector.ParquetDecimalVector;
-
-import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
-import org.apache.flink.table.data.TimestampData;
-import org.apache.flink.table.data.columnar.vector.VectorizedColumnBatch;
-import org.apache.flink.table.data.columnar.vector.heap.HeapBooleanVector;
-import org.apache.flink.table.data.columnar.vector.heap.HeapByteVector;
-import org.apache.flink.table.data.columnar.vector.heap.HeapBytesVector;
-import org.apache.flink.table.data.columnar.vector.heap.HeapDoubleVector;
-import org.apache.flink.table.data.columnar.vector.heap.HeapFloatVector;
-import org.apache.flink.table.data.columnar.vector.heap.HeapIntVector;
-import org.apache.flink.table.data.columnar.vector.heap.HeapLongVector;
-import org.apache.flink.table.data.columnar.vector.heap.HeapShortVector;
-import org.apache.flink.table.data.columnar.vector.heap.HeapTimestampVector;
-import 
org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector;
-import org.apache.flink.table.types.logical.ArrayType;
-import org.apache.flink.table.types.logical.LogicalType;
-import org.apache.parquet.column.ColumnDescriptor;
-import org.apache.parquet.column.page.PageReader;
-import org.apache.parquet.schema.PrimitiveType;
-import org.apache.parquet.schema.Type;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Array {@link ColumnReader}.
- */
-public class ArrayColumnReader extends BaseVectorizedColumnReader {
-
-  // The value read in last time
-  private Object lastValue;
-
-  // flag to indicate if there is no data in parquet data page
-  private boolean eof = false;
-
-  // flag to indicate if it's the first time to read parquet data page with 
this instance
-  boolean isFirstRow = true;
-
-  public ArrayColumnReader(
-      ColumnDescriptor descriptor,
-      PageReader pageReader,
-      boolean isUtcTimestamp,
-      Type type,
-      LogicalType logicalType)
-      throws IOException {
-    super(descriptor, pageReader, isUtcTimestamp, type, logicalType);
-  }
-
-  @Override
-  public void readToVector(int readNumber, WritableColumnVector vector) throws 
IOException {
-    HeapArrayVector lcv = (HeapArrayVector) vector;
-    // before readBatch, initial the size of offsets & lengths as the default 
value,
-    // the actual size will be assigned in setChildrenInfo() after reading 
complete.
-    lcv.offsets = new long[VectorizedColumnBatch.DEFAULT_SIZE];
-    lcv.lengths = new long[VectorizedColumnBatch.DEFAULT_SIZE];
-    // Because the length of ListColumnVector.child can't be known now,
-    // the valueList will save all data for ListColumnVector temporary.
-    List<Object> valueList = new ArrayList<>();
-
-    LogicalType category = ((ArrayType) logicalType).getElementType();
-
-    // read the first row in parquet data page, this will be only happened 
once for this
-    // instance
-    if (isFirstRow) {
-      if (!fetchNextValue(category)) {
-        return;
-      }
-      isFirstRow = false;
-    }
-
-    int index = collectDataFromParquetPage(readNumber, lcv, valueList, 
category);
-
-    // Convert valueList to array for the ListColumnVector.child
-    fillColumnVector(category, lcv, valueList, index);
-  }
-
-  /**
-   * Reads a single value from parquet page, puts it into lastValue. Returns a 
boolean indicating
-   * if there is more values to read (true).
-   *
-   * @param category
-   * @return boolean
-   * @throws IOException
-   */
-  private boolean fetchNextValue(LogicalType category) throws IOException {
-    int left = readPageIfNeed();
-    if (left > 0) {
-      // get the values of repetition and definitionLevel
-      readRepetitionAndDefinitionLevels();
-      // read the data if it isn't null
-      if (definitionLevel == maxDefLevel) {
-        if (isCurrentPageDictionaryEncoded) {
-          lastValue = dataColumn.readValueDictionaryId();
-        } else {
-          lastValue = readPrimitiveTypedRow(category);
-        }
-      } else {
-        lastValue = null;
-      }
-      return true;
-    } else {
-      eof = true;
-      return false;
-    }
-  }
-
-  private int readPageIfNeed() throws IOException {
-    // Compute the number of values we want to read in this page.
-    int leftInPage = (int) (endOfPageValueCount - valuesRead);
-    if (leftInPage == 0) {
-      // no data left in current page, load data from new page
-      readPage();
-      leftInPage = (int) (endOfPageValueCount - valuesRead);
-    }
-    return leftInPage;
-  }
-
-  // Need to be in consistent with that 
VectorizedPrimitiveColumnReader#readBatchHelper
-  // TODO Reduce the duplicated code
-  private Object readPrimitiveTypedRow(LogicalType category) {
-    switch (category.getTypeRoot()) {
-      case CHAR:
-      case VARCHAR:
-      case BINARY:
-      case VARBINARY:
-        return dataColumn.readString();
-      case BOOLEAN:
-        return dataColumn.readBoolean();
-      case TIME_WITHOUT_TIME_ZONE:
-      case DATE:
-      case INTEGER:
-        return dataColumn.readInteger();
-      case TINYINT:
-        return dataColumn.readTinyInt();
-      case SMALLINT:
-        return dataColumn.readSmallInt();
-      case BIGINT:
-        return dataColumn.readLong();
-      case FLOAT:
-        return dataColumn.readFloat();
-      case DOUBLE:
-        return dataColumn.readDouble();
-      case DECIMAL:
-        switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
-          case INT32:
-            return dataColumn.readInteger();
-          case INT64:
-            return dataColumn.readLong();
-          case BINARY:
-          case FIXED_LEN_BYTE_ARRAY:
-            return dataColumn.readString();
-          default:
-            throw new AssertionError();
-        }
-      case TIMESTAMP_WITHOUT_TIME_ZONE:
-      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-        return dataColumn.readTimestamp();
-      default:
-        throw new RuntimeException("Unsupported type in the list: " + type);
-    }
-  }
-
-  private Object dictionaryDecodeValue(LogicalType category, Integer 
dictionaryValue) {
-    if (dictionaryValue == null) {
-      return null;
-    }
-
-    switch (category.getTypeRoot()) {
-      case CHAR:
-      case VARCHAR:
-      case BINARY:
-      case VARBINARY:
-        return dictionary.readString(dictionaryValue);
-      case DATE:
-      case TIME_WITHOUT_TIME_ZONE:
-      case INTEGER:
-        return dictionary.readInteger(dictionaryValue);
-      case BOOLEAN:
-        return dictionary.readBoolean(dictionaryValue) ? 1 : 0;
-      case DOUBLE:
-        return dictionary.readDouble(dictionaryValue);
-      case FLOAT:
-        return dictionary.readFloat(dictionaryValue);
-      case TINYINT:
-        return dictionary.readTinyInt(dictionaryValue);
-      case SMALLINT:
-        return dictionary.readSmallInt(dictionaryValue);
-      case BIGINT:
-        return dictionary.readLong(dictionaryValue);
-      case DECIMAL:
-        switch (descriptor.getPrimitiveType().getPrimitiveTypeName()) {
-          case INT32:
-            return dictionary.readInteger(dictionaryValue);
-          case INT64:
-            return dictionary.readLong(dictionaryValue);
-          case FIXED_LEN_BYTE_ARRAY:
-          case BINARY:
-            return dictionary.readString(dictionaryValue);
-          default:
-            throw new AssertionError();
-        }
-      case TIMESTAMP_WITHOUT_TIME_ZONE:
-      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-        return dictionary.readTimestamp(dictionaryValue);
-      default:
-        throw new RuntimeException("Unsupported type in the list: " + type);
-    }
-  }
-
-  /**
-   * Collects data from a parquet page and returns the final row index where 
it stopped. The
-   * returned index can be equal to or less than total.
-   *
-   * @param total     maximum number of rows to collect
-   * @param lcv       column vector to do initial setup in data collection time
-   * @param valueList collection of values that will be fed into the vector 
later
-   * @param category
-   * @return int
-   * @throws IOException
-   */
-  private int collectDataFromParquetPage(
-      int total, HeapArrayVector lcv, List<Object> valueList, LogicalType 
category)
-      throws IOException {
-    int index = 0;
-    /*
-     * Here is a nested loop for collecting all values from a parquet page.
-     * A column of array type can be considered as a list of lists, so the two 
loops are as below:
-     * 1. The outer loop iterates on rows (index is a row index, so points to 
a row in the batch), e.g.:
-     * [0, 2, 3]    <- index: 0
-     * [NULL, 3, 4] <- index: 1
-     *
-     * 2. The inner loop iterates on values within a row (sets all data from 
parquet data page
-     * for an element in ListColumnVector), so fetchNextValue returns values 
one-by-one:
-     * 0, 2, 3, NULL, 3, 4
-     *
-     * As described below, the repetition level (repetitionLevel != 0)
-     * can be used to decide when we'll start to read values for the next list.
-     */
-    while (!eof && index < total) {
-      // add element to ListColumnVector one by one
-      lcv.offsets[index] = valueList.size();
-      /*
-       * Let's collect all values for a single list.
-       * Repetition level = 0 means that a new list started there in the 
parquet page,
-       * in that case, let's exit from the loop, and start to collect value 
for a new list.
-       */
-      do {
-        /*
-         * Definition level = 0 when a NULL value was returned instead of a 
list
-         * (this is not the same as a NULL value in of a list).
-         */
-        if (definitionLevel == 0) {
-          lcv.setNullAt(index);
-        }
-        valueList.add(
-            isCurrentPageDictionaryEncoded
-                ? dictionaryDecodeValue(category, (Integer) lastValue)
-                : lastValue);
-      } while (fetchNextValue(category) && (repetitionLevel != 0));
-
-      lcv.lengths[index] = valueList.size() - lcv.offsets[index];
-      index++;
-    }
-    return index;
-  }
-
-  /**
-   * The lengths & offsets will be initialized as default size (1024), it 
should be set to the
-   * actual size according to the element number.
-   */
-  private void setChildrenInfo(HeapArrayVector lcv, int itemNum, int 
elementNum) {
-    lcv.setSize(itemNum);
-    long[] lcvLength = new long[elementNum];
-    long[] lcvOffset = new long[elementNum];
-    System.arraycopy(lcv.lengths, 0, lcvLength, 0, elementNum);
-    System.arraycopy(lcv.offsets, 0, lcvOffset, 0, elementNum);
-    lcv.lengths = lcvLength;
-    lcv.offsets = lcvOffset;
-  }
-
-  private void fillColumnVector(
-      LogicalType category, HeapArrayVector lcv, List valueList, int 
elementNum) {
-    int total = valueList.size();
-    setChildrenInfo(lcv, total, elementNum);
-    switch (category.getTypeRoot()) {
-      case CHAR:
-      case VARCHAR:
-      case BINARY:
-      case VARBINARY:
-        lcv.child = new HeapBytesVector(total);
-        ((HeapBytesVector) lcv.child).reset();
-        for (int i = 0; i < valueList.size(); i++) {
-          byte[] src = ((List<byte[]>) valueList).get(i);
-          if (src == null) {
-            ((HeapBytesVector) lcv.child).setNullAt(i);
-          } else {
-            ((HeapBytesVector) lcv.child).appendBytes(i, src, 0, src.length);
-          }
-        }
-        break;
-      case BOOLEAN:
-        lcv.child = new HeapBooleanVector(total);
-        ((HeapBooleanVector) lcv.child).reset();
-        for (int i = 0; i < valueList.size(); i++) {
-          if (valueList.get(i) == null) {
-            ((HeapBooleanVector) lcv.child).setNullAt(i);
-          } else {
-            ((HeapBooleanVector) lcv.child).vector[i] =
-                ((List<Boolean>) valueList).get(i);
-          }
-        }
-        break;
-      case TINYINT:
-        lcv.child = new HeapByteVector(total);
-        ((HeapByteVector) lcv.child).reset();
-        for (int i = 0; i < valueList.size(); i++) {
-          if (valueList.get(i) == null) {
-            ((HeapByteVector) lcv.child).setNullAt(i);
-          } else {
-            ((HeapByteVector) lcv.child).vector[i] =
-                (byte) ((List<Integer>) valueList).get(i).intValue();
-          }
-        }
-        break;
-      case SMALLINT:
-        lcv.child = new HeapShortVector(total);
-        ((HeapShortVector) lcv.child).reset();
-        for (int i = 0; i < valueList.size(); i++) {
-          if (valueList.get(i) == null) {
-            ((HeapShortVector) lcv.child).setNullAt(i);
-          } else {
-            ((HeapShortVector) lcv.child).vector[i] =
-                (short) ((List<Integer>) valueList).get(i).intValue();
-          }
-        }
-        break;
-      case INTEGER:
-      case DATE:
-      case TIME_WITHOUT_TIME_ZONE:
-        lcv.child = new HeapIntVector(total);
-        ((HeapIntVector) lcv.child).reset();
-        for (int i = 0; i < valueList.size(); i++) {
-          if (valueList.get(i) == null) {
-            ((HeapIntVector) lcv.child).setNullAt(i);
-          } else {
-            ((HeapIntVector) lcv.child).vector[i] = ((List<Integer>) 
valueList).get(i);
-          }
-        }
-        break;
-      case FLOAT:
-        lcv.child = new HeapFloatVector(total);
-        ((HeapFloatVector) lcv.child).reset();
-        for (int i = 0; i < valueList.size(); i++) {
-          if (valueList.get(i) == null) {
-            ((HeapFloatVector) lcv.child).setNullAt(i);
-          } else {
-            ((HeapFloatVector) lcv.child).vector[i] = ((List<Float>) 
valueList).get(i);
-          }
-        }
-        break;
-      case BIGINT:
-        lcv.child = new HeapLongVector(total);
-        ((HeapLongVector) lcv.child).reset();
-        for (int i = 0; i < valueList.size(); i++) {
-          if (valueList.get(i) == null) {
-            ((HeapLongVector) lcv.child).setNullAt(i);
-          } else {
-            ((HeapLongVector) lcv.child).vector[i] = ((List<Long>) 
valueList).get(i);
-          }
-        }
-        break;
-      case DOUBLE:
-        lcv.child = new HeapDoubleVector(total);
-        ((HeapDoubleVector) lcv.child).reset();
-        for (int i = 0; i < valueList.size(); i++) {
-          if (valueList.get(i) == null) {
-            ((HeapDoubleVector) lcv.child).setNullAt(i);
-          } else {
-            ((HeapDoubleVector) lcv.child).vector[i] =
-                ((List<Double>) valueList).get(i);
-          }
-        }
-        break;
-      case TIMESTAMP_WITHOUT_TIME_ZONE:
-      case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
-        lcv.child = new HeapTimestampVector(total);
-        ((HeapTimestampVector) lcv.child).reset();
-        for (int i = 0; i < valueList.size(); i++) {
-          if (valueList.get(i) == null) {
-            ((HeapTimestampVector) lcv.child).setNullAt(i);
-          } else {
-            ((HeapTimestampVector) lcv.child)
-                .setTimestamp(i, ((List<TimestampData>) valueList).get(i));
-          }
-        }
-        break;
-      case DECIMAL:
-        PrimitiveType.PrimitiveTypeName primitiveTypeName =
-            descriptor.getPrimitiveType().getPrimitiveTypeName();
-        switch (primitiveTypeName) {
-          case INT32:
-            lcv.child = new ParquetDecimalVector(new HeapIntVector(total));
-            ((HeapIntVector) ((ParquetDecimalVector) 
lcv.child).getVector()).reset();
-            for (int i = 0; i < valueList.size(); i++) {
-              if (valueList.get(i) == null) {
-                ((HeapIntVector) ((ParquetDecimalVector) 
lcv.child).getVector())
-                    .setNullAt(i);
-              } else {
-                ((HeapIntVector) ((ParquetDecimalVector) 
lcv.child).getVector())
-                    .vector[i] =
-                    ((List<Integer>) valueList).get(i);
-              }
-            }
-            break;
-          case INT64:
-            lcv.child = new ParquetDecimalVector(new HeapLongVector(total));
-            ((HeapLongVector) ((ParquetDecimalVector) 
lcv.child).getVector()).reset();
-            for (int i = 0; i < valueList.size(); i++) {
-              if (valueList.get(i) == null) {
-                ((HeapLongVector) ((ParquetDecimalVector) 
lcv.child).getVector())
-                    .setNullAt(i);
-              } else {
-                ((HeapLongVector) ((ParquetDecimalVector) 
lcv.child).getVector())
-                    .vector[i] =
-                    ((List<Long>) valueList).get(i);
-              }
-            }
-            break;
-          default:
-            lcv.child = new ParquetDecimalVector(new HeapBytesVector(total));
-            ((HeapBytesVector) ((ParquetDecimalVector) 
lcv.child).getVector()).reset();
-            for (int i = 0; i < valueList.size(); i++) {
-              byte[] src = ((List<byte[]>) valueList).get(i);
-              if (valueList.get(i) == null) {
-                ((HeapBytesVector) ((ParquetDecimalVector) 
lcv.child).getVector())
-                    .setNullAt(i);
-              } else {
-                ((HeapBytesVector) ((ParquetDecimalVector) 
lcv.child).getVector())
-                    .appendBytes(i, src, 0, src.length);
-              }
-            }
-            break;
-        }
-        break;
-      default:
-        throw new RuntimeException("Unsupported type in the list: " + type);
-    }
-  }
-}
-
diff --git 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ArrayGroupReader.java
 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ArrayGroupReader.java
deleted file mode 100644
index 437c186a9366..000000000000
--- 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/ArrayGroupReader.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.format.cow.vector.reader;
-
-import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
-import 
org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector;
-import org.apache.hudi.table.format.cow.vector.HeapArrayGroupColumnVector;
-
-import java.io.IOException;
-
-/**
- * Array of a Group type (Array, Map, Row, etc.) {@link ColumnReader}.
- */
-public class ArrayGroupReader implements ColumnReader<WritableColumnVector> {
-
-  private final ColumnReader<WritableColumnVector> fieldReader;
-
-  public ArrayGroupReader(ColumnReader<WritableColumnVector> fieldReader) {
-    this.fieldReader = fieldReader;
-  }
-
-  @Override
-  public void readToVector(int readNumber, WritableColumnVector vector) throws 
IOException {
-    HeapArrayGroupColumnVector rowColumnVector = (HeapArrayGroupColumnVector) 
vector;
-
-    fieldReader.readToVector(readNumber, rowColumnVector.vector);
-  }
-}
diff --git 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/MapColumnReader.java
 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/MapColumnReader.java
deleted file mode 100644
index ee65dd22c436..000000000000
--- 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/MapColumnReader.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.format.cow.vector.reader;
-
-import org.apache.hudi.table.format.cow.vector.HeapMapColumnVector;
-
-import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
-import org.apache.flink.table.data.columnar.vector.heap.AbstractHeapVector;
-import 
org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector;
-
-import java.io.IOException;
-
-/**
- * Map {@link ColumnReader}.
- */
-public class MapColumnReader implements ColumnReader<WritableColumnVector> {
-
-  private final ArrayColumnReader keyReader;
-  private final ColumnReader<WritableColumnVector> valueReader;
-
-  public MapColumnReader(
-      ArrayColumnReader keyReader, ColumnReader<WritableColumnVector> 
valueReader) {
-    this.keyReader = keyReader;
-    this.valueReader = valueReader;
-  }
-
-  @Override
-  public void readToVector(int readNumber, WritableColumnVector vector) throws 
IOException {
-    HeapMapColumnVector mapColumnVector = (HeapMapColumnVector) vector;
-    AbstractHeapVector keyArrayColumnVector = (AbstractHeapVector) 
(mapColumnVector.getKeys());
-    keyReader.readToVector(readNumber, mapColumnVector.getKeys());
-    valueReader.readToVector(readNumber, mapColumnVector.getValues());
-    for (int i = 0; i < keyArrayColumnVector.getLen(); i++) {
-      if (keyArrayColumnVector.isNullAt(i)) {
-        mapColumnVector.setNullAt(i);
-      }
-    }
-  }
-}
-
diff --git 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/NestedColumnReader.java
 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/NestedColumnReader.java
index 6ae1cc9492ec..b4f04b20c477 100644
--- 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/NestedColumnReader.java
+++ 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/NestedColumnReader.java
@@ -114,9 +114,8 @@ public class NestedColumnReader implements 
ColumnReader<WritableColumnVector> {
       ParquetField child = children.get(i);
       if (child == null) {
         // Hudi schema-evolution: the logical field is not present in the 
Parquet file. The slot
-        // vector is expected to be pre-populated with nulls by the caller 
(lands in a follow-up
-        // PR that rewires ParquetSplitReaderUtil); keep it as is and skip 
contributing to the
-        // level stream.
+        // vector was pre-populated with nulls by 
ParquetSplitReaderUtil#createWritableColumnVector
+        // (ROW branch); keep it as is and skip contributing to the level 
stream.
         finalChildrenVectors[i] = childrenVectors[i];
         continue;
       }
@@ -148,11 +147,11 @@ public class NestedColumnReader implements 
ColumnReader<WritableColumnVector> {
       setFieldNullFlag(rowPosition.getIsNull(), heapRowVector);
     }
 
-    // Hudi-specific: collapse a present row whose every child is null into a 
null row. The
-    // legacy RowColumnReader did this so that a SQL value like `row(null, 
null)` round-trips
-    // to NULL on read; preserve it here for backward compatibility. Diverges 
from Flink 2.1,
-    // which would surface it as Row(null, null). Mirrored by the integration 
test
-    // ITTestHoodieDataSource#testParquetNullChildColumnsRowTypes.
+    // Hudi-specific: collapse a present row whose every child is null into a 
null row, so that a
+    // SQL value like `row(null, null)` round-trips to NULL on read. This was 
the behaviour of the
+    // legacy RowColumnReader (deleted alongside the Dremel rewire) and 
existing Hudi tables rely
+    // on it. Diverges from Flink 2.1, which would surface it as Row(null, 
null). Pinned by the
+    // integration test 
ITTestHoodieDataSource#testParquetNullChildColumnsRowTypes.
     int rowCount = rowPosition.getPositionsCount();
     for (int j = 0; j < rowCount; j++) {
       if (heapRowVector.isNullAt(j)) {
diff --git 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/NestedPrimitiveColumnReader.java
 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/NestedPrimitiveColumnReader.java
index 3f0aefe2af74..72809db1b2ce 100644
--- 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/NestedPrimitiveColumnReader.java
+++ 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/NestedPrimitiveColumnReader.java
@@ -71,8 +71,8 @@ import static org.apache.parquet.column.ValuesType.VALUES;
  * and the Hudi-local {@link ParquetDecimalVector} / {@link LevelDelegation} / 
{@link IntArrayList}
  * imports are changed; the algorithm is untouched. The companion 
Hudi-specific {@code
  * Int64TimestampColumnReader} / {@code FixedLenBytesColumnReader} behaviours 
stay at the leaf-
- * reader creation boundary in {@code ParquetSplitReaderUtil} (lands in a 
follow-up PR), not inside
- * this class — keeping it a faithful copy of upstream.
+ * reader creation boundary in {@code ParquetSplitReaderUtil}, not inside this 
class — keeping it
+ * a faithful copy of upstream.
  */
 public class NestedPrimitiveColumnReader implements 
ColumnReader<WritableColumnVector> {
   private static final Logger LOG = 
LoggerFactory.getLogger(NestedPrimitiveColumnReader.class);
diff --git 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java
 
b/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java
deleted file mode 100644
index 79b50487f13c..000000000000
--- 
a/hudi-flink-datasource/hudi-flink1.18.x/src/main/java/org/apache/hudi/table/format/cow/vector/reader/RowColumnReader.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hudi.table.format.cow.vector.reader;
-
-import org.apache.hudi.table.format.cow.vector.HeapRowColumnVector;
-
-import org.apache.flink.formats.parquet.vector.reader.ColumnReader;
-import 
org.apache.flink.table.data.columnar.vector.writable.WritableColumnVector;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Row {@link ColumnReader}.
- */
-public class RowColumnReader implements ColumnReader<WritableColumnVector> {
-
-  private final List<ColumnReader> fieldReaders;
-
-  public RowColumnReader(List<ColumnReader> fieldReaders) {
-    this.fieldReaders = fieldReaders;
-  }
-
-  @Override
-  public void readToVector(int readNumber, WritableColumnVector vector) throws 
IOException {
-    HeapRowColumnVector rowColumnVector = (HeapRowColumnVector) vector;
-    WritableColumnVector[] vectors = rowColumnVector.vectors;
-    // row vector null array
-    boolean[] isNulls = new boolean[readNumber];
-    for (int i = 0; i < vectors.length; i++) {
-      fieldReaders.get(i).readToVector(readNumber, vectors[i]);
-
-      for (int j = 0; j < readNumber; j++) {
-        if (i == 0) {
-          isNulls[j] = vectors[i].isNullAt(j);
-        } else {
-          isNulls[j] = isNulls[j] && vectors[i].isNullAt(j);
-        }
-        if (i == vectors.length - 1 && isNulls[j]) {
-          // rowColumnVector[j] is null only when all fields[j] of 
rowColumnVector[j] is
-          // null
-          rowColumnVector.setNullAt(j);
-        }
-      }
-    }
-  }
-}
diff --git 
a/hudi-flink-datasource/hudi-flink1.18.x/src/test/java/org/apache/hudi/table/format/cow/vector/TestHeapColumnVectorAccessors.java
 
b/hudi-flink-datasource/hudi-flink1.18.x/src/test/java/org/apache/hudi/table/format/cow/vector/TestHeapColumnVectorAccessors.java
index 4b48fdd37460..7cb62824e854 100644
--- 
a/hudi-flink-datasource/hudi-flink1.18.x/src/test/java/org/apache/hudi/table/format/cow/vector/TestHeapColumnVectorAccessors.java
+++ 
b/hudi-flink-datasource/hudi-flink1.18.x/src/test/java/org/apache/hudi/table/format/cow/vector/TestHeapColumnVectorAccessors.java
@@ -35,8 +35,9 @@ import static org.junit.jupiter.api.Assertions.assertSame;
  *
  * <p>The accessors are wrappers over the existing public fields so legacy 
callers continue to
  * work. These tests exist solely to pin down that wrapper contract — runtime 
correctness of the
- * Dremel-style read path is exercised end-to-end by integration tests once
- * {@code ParquetSplitReaderUtil} is wired up to {@code NestedColumnReader} in 
a follow-up PR.
+ * Dremel-style read path is exercised end-to-end by integration tests in
+ * {@code ITTestHoodieDataSource} (testParquetComplexTypes / 
testParquetComplexNestedRowTypes /
+ * testParquetArrayMapOfRowTypes / testParquetNullChildColumnsRowTypes).
  */
 class TestHeapColumnVectorAccessors {
 

Reply via email to