This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new fd7ab438a [parquet] Child vector of complex type should arrange 
elements compactly (like orc) (#3883)
fd7ab438a is described below

commit fd7ab438aaf17d9fc94c7499f2e490c122fbbcac
Author: yuzelin <[email protected]>
AuthorDate: Thu Aug 8 22:15:33 2024 +0800

    [parquet] Child vector of complex type should arrange elements compactly 
(like orc) (#3883)
---
 .../format/parquet/reader/NestedColumnReader.java  | 110 ++-
 .../format/parquet/reader/NestedPositionUtil.java  |  84 +--
 .../reader/NestedPrimitiveColumnReader.java        |  71 +-
 .../format/parquet/ParquetColumnVectorTest.java    | 782 +++++++++++++++++++++
 .../format/parquet/ParquetReadWriteTest.java       |  57 +-
 5 files changed, 957 insertions(+), 147 deletions(-)

diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java
index e39005800..c89c77603 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java
@@ -26,7 +26,6 @@ import org.apache.paimon.data.columnar.heap.HeapRowVector;
 import org.apache.paimon.data.columnar.writable.WritableColumnVector;
 import org.apache.paimon.format.parquet.position.CollectionPosition;
 import org.apache.paimon.format.parquet.position.LevelDelegation;
-import org.apache.paimon.format.parquet.position.RowPosition;
 import org.apache.paimon.format.parquet.type.ParquetField;
 import org.apache.paimon.format.parquet.type.ParquetGroupField;
 import org.apache.paimon.format.parquet.type.ParquetPrimitiveField;
@@ -41,6 +40,7 @@ import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.page.PageReadStore;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -86,20 +86,26 @@ public class NestedColumnReader implements 
ColumnReader<WritableColumnVector> {
 
     @Override
     public void readToVector(int readNumber, WritableColumnVector vector) 
throws IOException {
-        readData(field, readNumber, vector, false);
+        readData(field, readNumber, vector, false, false, false);
     }
 
     private Pair<LevelDelegation, WritableColumnVector> readData(
-            ParquetField field, int readNumber, ColumnVector vector, boolean 
inside)
+            ParquetField field,
+            int readNumber,
+            ColumnVector vector,
+            boolean inside,
+            boolean readRowField,
+            boolean readMapKey)
             throws IOException {
         if (field.getType() instanceof RowType) {
             return readRow((ParquetGroupField) field, readNumber, vector, 
inside);
         } else if (field.getType() instanceof MapType || field.getType() 
instanceof MultisetType) {
-            return readMap((ParquetGroupField) field, readNumber, vector, 
inside);
+            return readMap((ParquetGroupField) field, readNumber, vector, 
inside, readRowField);
         } else if (field.getType() instanceof ArrayType) {
-            return readArray((ParquetGroupField) field, readNumber, vector, 
inside);
+            return readArray((ParquetGroupField) field, readNumber, vector, 
inside, readRowField);
         } else {
-            return readPrimitive((ParquetPrimitiveField) field, readNumber, 
vector);
+            return readPrimitive(
+                    (ParquetPrimitiveField) field, readNumber, vector, 
readRowField, readMapKey);
         }
     }
 
@@ -107,45 +113,60 @@ public class NestedColumnReader implements 
ColumnReader<WritableColumnVector> {
             ParquetGroupField field, int readNumber, ColumnVector vector, 
boolean inside)
             throws IOException {
         HeapRowVector heapRowVector = (HeapRowVector) vector;
-        LevelDelegation levelDelegation = null;
+        LevelDelegation longest = null;
         List<ParquetField> children = field.getChildren();
         WritableColumnVector[] childrenVectors = heapRowVector.getFields();
         WritableColumnVector[] finalChildrenVectors =
                 new WritableColumnVector[childrenVectors.length];
         for (int i = 0; i < children.size(); i++) {
             Pair<LevelDelegation, WritableColumnVector> tuple =
-                    readData(children.get(i), readNumber, childrenVectors[i], 
true);
-            levelDelegation = tuple.getLeft();
+                    readData(children.get(i), readNumber, childrenVectors[i], 
true, true, false);
+            LevelDelegation current = tuple.getLeft();
+            if (longest == null) {
+                longest = current;
+            } else if (current.getDefinitionLevel().length > 
longest.getDefinitionLevel().length) {
+                longest = current;
+            }
             finalChildrenVectors[i] = tuple.getRight();
         }
-        if (levelDelegation == null) {
+        if (longest == null) {
             throw new RuntimeException(
                     String.format("Row field does not have any children: %s.", 
field));
         }
 
-        RowPosition rowPosition =
-                NestedPositionUtil.calculateRowOffsets(
-                        field,
-                        levelDelegation.getDefinitionLevel(),
-                        levelDelegation.getRepetitionLevel());
+        int len = ((AbstractHeapVector) finalChildrenVectors[0]).getLen();
+        boolean[] isNull = new boolean[len];
+        Arrays.fill(isNull, true);
+        boolean hasNull = false;
+        for (int i = 0; i < len; i++) {
+            for (WritableColumnVector child : finalChildrenVectors) {
+                isNull[i] = isNull[i] && child.isNullAt(i);
+            }
+            if (isNull[i]) {
+                hasNull = true;
+            }
+        }
 
         // If row was inside the structure, then we need to renew the vector 
to reset the
         // capacity.
         if (inside) {
-            heapRowVector =
-                    new HeapRowVector(rowPosition.getPositionsCount(), 
finalChildrenVectors);
+            heapRowVector = new HeapRowVector(len, finalChildrenVectors);
         } else {
             heapRowVector.setFields(finalChildrenVectors);
         }
 
-        if (rowPosition.getIsNull() != null) {
-            setFieldNullFalg(rowPosition.getIsNull(), heapRowVector);
+        if (hasNull) {
+            setFieldNullFlag(isNull, heapRowVector);
         }
-        return Pair.of(levelDelegation, heapRowVector);
+        return Pair.of(longest, heapRowVector);
     }
 
     private Pair<LevelDelegation, WritableColumnVector> readMap(
-            ParquetGroupField field, int readNumber, ColumnVector vector, 
boolean inside)
+            ParquetGroupField field,
+            int readNumber,
+            ColumnVector vector,
+            boolean inside,
+            boolean readRowField)
             throws IOException {
         HeapMapVector mapVector = (HeapMapVector) vector;
         mapVector.reset();
@@ -155,9 +176,21 @@ public class NestedColumnReader implements 
ColumnReader<WritableColumnVector> {
                 "Maps must have two type parameters, found %s",
                 children.size());
         Pair<LevelDelegation, WritableColumnVector> keyTuple =
-                readData(children.get(0), readNumber, 
mapVector.getKeyColumnVector(), true);
+                readData(
+                        children.get(0),
+                        readNumber,
+                        mapVector.getKeyColumnVector(),
+                        true,
+                        false,
+                        true);
         Pair<LevelDelegation, WritableColumnVector> valueTuple =
-                readData(children.get(1), readNumber, 
mapVector.getValueColumnVector(), true);
+                readData(
+                        children.get(1),
+                        readNumber,
+                        mapVector.getValueColumnVector(),
+                        true,
+                        false,
+                        false);
 
         LevelDelegation levelDelegation = keyTuple.getLeft();
 
@@ -165,7 +198,8 @@ public class NestedColumnReader implements 
ColumnReader<WritableColumnVector> {
                 NestedPositionUtil.calculateCollectionOffsets(
                         field,
                         levelDelegation.getDefinitionLevel(),
-                        levelDelegation.getRepetitionLevel());
+                        levelDelegation.getRepetitionLevel(),
+                        readRowField);
 
         // If map was inside the structure, then we need to renew the vector 
to reset the
         // capacity.
@@ -181,7 +215,7 @@ public class NestedColumnReader implements 
ColumnReader<WritableColumnVector> {
         }
 
         if (collectionPosition.getIsNull() != null) {
-            setFieldNullFalg(collectionPosition.getIsNull(), mapVector);
+            setFieldNullFlag(collectionPosition.getIsNull(), mapVector);
         }
 
         mapVector.setLengths(collectionPosition.getLength());
@@ -191,7 +225,11 @@ public class NestedColumnReader implements 
ColumnReader<WritableColumnVector> {
     }
 
     private Pair<LevelDelegation, WritableColumnVector> readArray(
-            ParquetGroupField field, int readNumber, ColumnVector vector, 
boolean inside)
+            ParquetGroupField field,
+            int readNumber,
+            ColumnVector vector,
+            boolean inside,
+            boolean readRowField)
             throws IOException {
         HeapArrayVector arrayVector = (HeapArrayVector) vector;
         arrayVector.reset();
@@ -201,14 +239,15 @@ public class NestedColumnReader implements 
ColumnReader<WritableColumnVector> {
                 "Arrays must have a single type parameter, found %s",
                 children.size());
         Pair<LevelDelegation, WritableColumnVector> tuple =
-                readData(children.get(0), readNumber, arrayVector.getChild(), 
true);
+                readData(children.get(0), readNumber, arrayVector.getChild(), 
true, false, false);
 
         LevelDelegation levelDelegation = tuple.getLeft();
         CollectionPosition collectionPosition =
                 NestedPositionUtil.calculateCollectionOffsets(
                         field,
                         levelDelegation.getDefinitionLevel(),
-                        levelDelegation.getRepetitionLevel());
+                        levelDelegation.getRepetitionLevel(),
+                        readRowField);
 
         // If array was inside the structure, then we need to renew the vector 
to reset the
         // capacity.
@@ -219,7 +258,7 @@ public class NestedColumnReader implements 
ColumnReader<WritableColumnVector> {
         }
 
         if (collectionPosition.getIsNull() != null) {
-            setFieldNullFalg(collectionPosition.getIsNull(), arrayVector);
+            setFieldNullFlag(collectionPosition.getIsNull(), arrayVector);
         }
         arrayVector.setLengths(collectionPosition.getLength());
         arrayVector.setOffsets(collectionPosition.getOffsets());
@@ -227,7 +266,12 @@ public class NestedColumnReader implements 
ColumnReader<WritableColumnVector> {
     }
 
     private Pair<LevelDelegation, WritableColumnVector> readPrimitive(
-            ParquetPrimitiveField field, int readNumber, ColumnVector vector) 
throws IOException {
+            ParquetPrimitiveField field,
+            int readNumber,
+            ColumnVector vector,
+            boolean readRowField,
+            boolean readMapKey)
+            throws IOException {
         ColumnDescriptor descriptor = field.getDescriptor();
         NestedPrimitiveColumnReader reader = columnReaders.get(descriptor);
         if (reader == null) {
@@ -237,7 +281,9 @@ public class NestedColumnReader implements 
ColumnReader<WritableColumnVector> {
                             pages.getPageReader(descriptor),
                             isUtcTimestamp,
                             descriptor.getPrimitiveType(),
-                            field.getType());
+                            field.getType(),
+                            readRowField,
+                            readMapKey);
             columnReaders.put(descriptor, reader);
         }
         WritableColumnVector writableColumnVector =
@@ -245,7 +291,7 @@ public class NestedColumnReader implements 
ColumnReader<WritableColumnVector> {
         return Pair.of(reader.getLevelDelegation(), writableColumnVector);
     }
 
-    private static void setFieldNullFalg(boolean[] nullFlags, 
AbstractHeapVector vector) {
+    private static void setFieldNullFlag(boolean[] nullFlags, 
AbstractHeapVector vector) {
         for (int index = 0; index < vector.getLen() && index < 
nullFlags.length; index++) {
             if (nullFlags[index]) {
                 vector.setNullAt(index);
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPositionUtil.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPositionUtil.java
index 5f0757c23..b43169a40 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPositionUtil.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPositionUtil.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.format.parquet.reader;
 
 import org.apache.paimon.format.parquet.position.CollectionPosition;
-import org.apache.paimon.format.parquet.position.RowPosition;
 import org.apache.paimon.format.parquet.type.ParquetField;
 import org.apache.paimon.utils.BooleanArrayList;
 import org.apache.paimon.utils.LongArrayList;
@@ -29,50 +28,6 @@ import static java.lang.String.format;
 /** Utils to calculate nested type position. */
 public class NestedPositionUtil {
 
-    /**
-     * Calculate row offsets according to column's max repetition level, 
definition level, value's
-     * repetition level and definition level. Each row has three situation:
-     * <li>Row is not defined,because it's optional parent fields is null, 
this is decided by its
-     *     parent's repetition level
-     * <li>Row is null
-     * <li>Row is defined and not empty.
-     *
-     * @param field field that contains the row column message include max 
repetition level and
-     *     definition level.
-     * @param fieldRepetitionLevels int array with each value's repetition 
level.
-     * @param fieldDefinitionLevels int array with each value's definition 
level.
-     * @return {@link RowPosition} contains collections row count and isNull 
array.
-     */
-    public static RowPosition calculateRowOffsets(
-            ParquetField field, int[] fieldDefinitionLevels, int[] 
fieldRepetitionLevels) {
-        int rowDefinitionLevel = field.getDefinitionLevel();
-        int rowRepetitionLevel = field.getRepetitionLevel();
-        int nullValuesCount = 0;
-        BooleanArrayList nullRowFlags = new BooleanArrayList(0);
-        for (int i = 0; i < fieldDefinitionLevels.length; i++) {
-            if (fieldRepetitionLevels[i] > rowRepetitionLevel) {
-                throw new IllegalStateException(
-                        format(
-                                "In parquet's row type field repetition level 
should not larger than row's repetition level. "
-                                        + "Row repetition level is %s, row 
field repetition level is %s.",
-                                rowRepetitionLevel, fieldRepetitionLevels[i]));
-            }
-
-            if (fieldDefinitionLevels[i] >= rowDefinitionLevel) {
-                // current row is defined and not empty
-                nullRowFlags.add(false);
-            } else {
-                // current row is null
-                nullRowFlags.add(true);
-                nullValuesCount++;
-            }
-        }
-        if (nullValuesCount == 0) {
-            return new RowPosition(null, fieldDefinitionLevels.length);
-        }
-        return new RowPosition(nullRowFlags.toArray(), nullRowFlags.size());
-    }
-
     /**
      * Calculate the collection's offsets according to column's max repetition 
level, definition
      * level, value's repetition level and definition level. Each collection 
(Array or Map) has four
@@ -92,7 +47,10 @@ public class NestedPositionUtil {
      *     array.
      */
     public static CollectionPosition calculateCollectionOffsets(
-            ParquetField field, int[] definitionLevels, int[] 
repetitionLevels) {
+            ParquetField field,
+            int[] definitionLevels,
+            int[] repetitionLevels,
+            boolean readRowField) {
         int collectionDefinitionLevel = field.getDefinitionLevel();
         int collectionRepetitionLevel = field.getRepetitionLevel() + 1;
         int offset = 0;
@@ -105,36 +63,42 @@ public class NestedPositionUtil {
         for (int i = 0;
                 i < definitionLevels.length;
                 i = getNextCollectionStartIndex(repetitionLevels, 
collectionRepetitionLevel, i)) {
-            valueCount++;
             if (definitionLevels[i] >= collectionDefinitionLevel - 1) {
-                boolean isNull =
-                        isOptionalFieldValueNull(definitionLevels[i], 
collectionDefinitionLevel);
-                nullCollectionFlags.add(isNull);
-                nullValuesCount += isNull ? 1 : 0;
                 // definitionLevels[i] > collectionDefinitionLevel  => 
Collection is defined and not
                 // empty
                 // definitionLevels[i] == collectionDefinitionLevel => 
Collection is defined but
                 // empty
+                // definitionLevels[i] == collectionDefinitionLevel - 1 => 
Collection is defined but
+                // null
                 if (definitionLevels[i] > collectionDefinitionLevel) {
+                    nullCollectionFlags.add(false);
                     emptyCollectionFlags.add(false);
                     offset += getCollectionSize(repetitionLevels, 
collectionRepetitionLevel, i + 1);
                 } else if (definitionLevels[i] == collectionDefinitionLevel) {
-                    offset++;
+                    nullCollectionFlags.add(false);
                     emptyCollectionFlags.add(true);
+                    // don't increase offset for empty values
                 } else {
-                    offset++;
+                    nullCollectionFlags.add(true);
+                    nullValuesCount++;
+                    // 1. don't increase offset for null values
+                    // 2. offsets and emptyCollectionFlags are meaningless for 
null values, but they
+                    // must be set at each index for calculating lengths later
                     emptyCollectionFlags.add(false);
                 }
                 offsets.add(offset);
-            } else {
-                // when definitionLevels[i] < collectionDefinitionLevel - 1, 
it means the collection
-                // is
-                // not defined, but we need to regard it as null to avoid 
getting value wrong.
+                valueCount++;
+            } else if (definitionLevels[i] == collectionDefinitionLevel - 2 && 
readRowField) {
+                // row field should store null value
                 nullCollectionFlags.add(true);
                 nullValuesCount++;
-                offsets.add(++offset);
                 emptyCollectionFlags.add(false);
+
+                offsets.add(offset);
+                valueCount++;
             }
+            // else when definitionLevels[i] < collectionDefinitionLevel - 1, 
it means the
+            // collection is not defined, just ignore it
         }
         long[] offsetsArray = offsets.toArray();
         long[] length = 
calculateLengthByOffsets(emptyCollectionFlags.toArray(), offsetsArray);
@@ -145,10 +109,6 @@ public class NestedPositionUtil {
                 nullCollectionFlags.toArray(), offsetsArray, length, 
valueCount);
     }
 
-    public static boolean isOptionalFieldValueNull(int definitionLevel, int 
maxDefinitionLevel) {
-        return definitionLevel == maxDefinitionLevel - 1;
-    }
-
     public static long[] calculateLengthByOffsets(
             boolean[] collectionIsEmpty, long[] arrayOffsets) {
         LongArrayList lengthList = new LongArrayList(arrayOffsets.length);
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java
index 0b0d89d4d..7ee33a0bb 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java
@@ -73,12 +73,14 @@ public class NestedPrimitiveColumnReader implements 
ColumnReader<WritableColumnV
     private final ColumnDescriptor descriptor;
     private final Type type;
     private final DataType dataType;
+    private final boolean readRowField;
+    private final boolean readMapKey;
     /** The dictionary, if this column has dictionary encoding. */
     private final ParquetDataColumnReader dictionary;
     /** Maximum definition level for this column. */
     private final int maxDefLevel;
 
-    private boolean isUtcTimestamp;
+    private final boolean isUtcTimestamp;
 
     /** Total number of values read. */
     private long valuesRead;
@@ -109,14 +111,16 @@ public class NestedPrimitiveColumnReader implements 
ColumnReader<WritableColumnV
 
     private boolean isFirstRow = true;
 
-    private Object lastValue;
+    private final LastValueContainer lastValue = new LastValueContainer();
 
     public NestedPrimitiveColumnReader(
             ColumnDescriptor descriptor,
             PageReader pageReader,
             boolean isUtcTimestamp,
             Type parquetType,
-            DataType dataType)
+            DataType dataType,
+            boolean readRowField,
+            boolean readMapKey)
             throws IOException {
         this.descriptor = descriptor;
         this.type = parquetType;
@@ -124,6 +128,8 @@ public class NestedPrimitiveColumnReader implements 
ColumnReader<WritableColumnV
         this.maxDefLevel = descriptor.getMaxDefinitionLevel();
         this.isUtcTimestamp = isUtcTimestamp;
         this.dataType = dataType;
+        this.readRowField = readRowField;
+        this.readMapKey = readMapKey;
 
         DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
         if (dictionaryPage != null) {
@@ -168,8 +174,10 @@ public class NestedPrimitiveColumnReader implements 
ColumnReader<WritableColumnV
         // repeated type need two loops to read data.
         while (!eof && index < readNumber) {
             do {
-                valueList.add(lastValue);
-                valueIndex++;
+                if (!lastValue.shouldSkip) {
+                    valueList.add(lastValue.value);
+                    valueIndex++;
+                }
             } while (readValue() && (repetitionLevel != 0));
             index++;
         }
@@ -187,6 +195,27 @@ public class NestedPrimitiveColumnReader implements 
ColumnReader<WritableColumnV
         return new LevelDelegation(repetition, definition);
     }
 
+    /**
+     * An ARRAY[ARRAY[INT]] Example: {[[0, null], [1], [], null], [], null} => 
[5, 4, 5, 3, 2, 1, 0]
+     *
+     * <ul>
+     *   <li>definitionLevel == maxDefLevel => not null value
+     *   <li>definitionLevel == maxDefLevel - 1 => null value
+     *   <li>definitionLevel == maxDefLevel - 2 => empty set, skip
+     *   <li>definitionLevel == maxDefLevel - 3 => null set, skip
+     *   <li>definitionLevel == maxDefLevel - 4 => empty outer set, skip
+     *   <li>definitionLevel == maxDefLevel - 5 => null outer set, skip
+     *   <li>... skip
+     * </ul>
+     *
+     * <p>When (definitionLevel <= maxDefLevel - 2) we skip the value because 
children ColumnVector
+     * for OrcArrayColumnVector don't contain empty and null set value. Stay 
consistent here.
+     *
+     * <p>For MAP, the value vector is the same as ARRAY. But the key vector 
isn't nullable, so just
+     * read value when definitionLevel == maxDefLevel.
+     *
+     * <p>For ROW, RowColumnVector still get null value when definitionLevel 
== maxDefLevel - 2.
+     */
     private boolean readValue() throws IOException {
         int left = readPageIfNeed();
         if (left > 0) {
@@ -196,12 +225,24 @@ public class NestedPrimitiveColumnReader implements 
ColumnReader<WritableColumnV
             if (definitionLevel == maxDefLevel) {
                 if (isCurrentPageDictionaryEncoded) {
                     int dictionaryId = dataColumn.readValueDictionaryId();
-                    lastValue = dictionaryDecodeValue(dataType, dictionaryId);
+                    lastValue.setValue(dictionaryDecodeValue(dataType, 
dictionaryId));
                 } else {
-                    lastValue = readPrimitiveTypedRow(dataType);
+                    lastValue.setValue(readPrimitiveTypedRow(dataType));
                 }
             } else {
-                lastValue = null;
+                if (readMapKey) {
+                    lastValue.skip();
+                } else {
+                    if (definitionLevel == maxDefLevel - 1) {
+                        // null value inner set
+                        lastValue.setValue(null);
+                    } else if (definitionLevel == maxDefLevel - 2 && 
readRowField) {
+                        lastValue.setValue(null);
+                    } else {
+                        // current set is empty or null
+                        lastValue.skip();
+                    }
+                }
             }
             return true;
         } else {
@@ -641,4 +682,18 @@ public class NestedPrimitiveColumnReader implements 
ColumnReader<WritableColumnV
             return 0;
         }
     }
+
+    private static class LastValueContainer {
+        protected boolean shouldSkip;
+        protected Object value;
+
+        protected void setValue(Object value) {
+            this.value = value;
+            this.shouldSkip = false;
+        }
+
+        protected void skip() {
+            this.shouldSkip = true;
+        }
+    }
 }
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetColumnVectorTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetColumnVectorTest.java
new file mode 100644
index 000000000..e08e4f3ae
--- /dev/null
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetColumnVectorTest.java
@@ -0,0 +1,782 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.parquet;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.DataGetters;
+import org.apache.paimon.data.GenericArray;
+import org.apache.paimon.data.GenericMap;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalArray;
+import org.apache.paimon.data.InternalMap;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.columnar.ArrayColumnVector;
+import org.apache.paimon.data.columnar.BytesColumnVector;
+import org.apache.paimon.data.columnar.ColumnVector;
+import org.apache.paimon.data.columnar.IntColumnVector;
+import org.apache.paimon.data.columnar.MapColumnVector;
+import org.apache.paimon.data.columnar.RowColumnVector;
+import org.apache.paimon.data.columnar.VectorizedColumnBatch;
+import org.apache.paimon.format.FormatReaderContext;
+import org.apache.paimon.format.FormatWriter;
+import org.apache.paimon.format.parquet.writer.RowDataParquetBuilder;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.reader.VectorizedRecordIterator;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.StringUtils;
+
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Validate the {@link ColumnVector}s read by Parquet format. */
+public class ParquetColumnVectorTest {
+
+    private @TempDir java.nio.file.Path tempDir;
+
+    private static final Random RND = ThreadLocalRandom.current();
+    private static final BiFunction<ColumnVector, Integer, String> 
BYTES_COLUMN_VECTOR_STRING_FUNC =
+            (cv, i) ->
+                    cv.isNullAt(i)
+                            ? "null"
+                            : new String(((BytesColumnVector) 
cv).getBytes(i).getBytes());
+
+    @Test
+    public void testArrayString() throws IOException {
+        RowType rowType =
+                RowType.builder()
+                        .field("array_string", 
DataTypes.ARRAY(DataTypes.STRING()))
+                        .build();
+
+        int numRows = RND.nextInt(5) + 5;
+        ArrayObject expectedData = new ArrayObject();
+        List<InternalRow> rows = new ArrayList<>(numRows);
+        for (int i = 0; i < numRows; i++) {
+            if (RND.nextBoolean()) {
+                expectedData.add(null);
+                rows.add(GenericRow.of((Object) null));
+                continue;
+            }
+
+            int currentSize = RND.nextInt(5);
+            List<String> currentStringArray =
+                    IntStream.range(0, currentSize)
+                            .mapToObj(idx -> randomString())
+                            .collect(Collectors.toList());
+            expectedData.add(currentStringArray);
+            GenericArray array =
+                    new GenericArray(
+                            
currentStringArray.stream().map(BinaryString::fromString).toArray());
+            rows.add(GenericRow.of(array));
+        }
+
+        VectorizedRecordIterator iterator = 
createVectorizedRecordIterator(rowType, rows);
+        VectorizedColumnBatch batch = iterator.batch();
+        InternalArray.ElementGetter getter = 
InternalArray.createElementGetter(DataTypes.STRING());
+
+        // validate row by row
+        for (int i = 0; i < numRows; i++) {
+            InternalRow row = iterator.next();
+            expectedData.validateRow(row, i, getter);
+        }
+        assertThat(iterator.next()).isNull();
+
+        // validate ColumnVector
+        ArrayColumnVector arrayColumnVector = (ArrayColumnVector) 
batch.columns[0];
+        expectedData.validateColumnVector(arrayColumnVector, getter);
+
+        expectedData.validateInnerChild(
+                arrayColumnVector.getColumnVector(), 
BYTES_COLUMN_VECTOR_STRING_FUNC);
+
+        iterator.releaseBatch();
+    }
+
+    @Test
+    public void testArrayArrayString() throws IOException {
+        RowType rowType =
+                RowType.builder()
+                        .field(
+                                "array_array_string",
+                                
DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.STRING())))
+                        .build();
+
+        int numRows = RND.nextInt(5) + 5;
+        ArrayArrayObject expectedData = new ArrayArrayObject();
+        List<InternalRow> rows = new ArrayList<>(numRows);
+        for (int i = 0; i < numRows; i++) {
+            // outer null row
+            if (RND.nextBoolean()) {
+                expectedData.add(null);
+                rows.add(GenericRow.of((Object) null));
+                continue;
+            }
+
+            int arraySize = RND.nextInt(5);
+            ArrayObject arrayObject = new ArrayObject();
+            GenericArray[] innerArrays = new GenericArray[arraySize];
+            for (int aIdx = 0; aIdx < arraySize; aIdx++) {
+                // inner null array
+                if (RND.nextBoolean()) {
+                    arrayObject.add(null);
+                    innerArrays[aIdx] = null;
+                    continue;
+                }
+
+                int arrayStringSize = RND.nextInt(5);
+                List<String> currentStringArray =
+                        IntStream.range(0, arrayStringSize)
+                                .mapToObj(idx -> randomString())
+                                .collect(Collectors.toList());
+                arrayObject.add(currentStringArray);
+                innerArrays[aIdx] =
+                        new GenericArray(
+                                currentStringArray.stream()
+                                        .map(BinaryString::fromString)
+                                        .toArray());
+            }
+            expectedData.add(arrayObject);
+            rows.add(GenericRow.of(new GenericArray(innerArrays)));
+        }
+
+        VectorizedRecordIterator iterator = 
createVectorizedRecordIterator(rowType, rows);
+        VectorizedColumnBatch batch = iterator.batch();
+        InternalArray.ElementGetter getter = 
InternalArray.createElementGetter(DataTypes.STRING());
+
+        // validate row by row
+        for (int i = 0; i < numRows; i++) {
+            InternalRow row = iterator.next();
+            expectedData.validateRow(row, i, getter);
+        }
+        assertThat(iterator.next()).isNull();
+
+        // validate column vector
+        ArrayColumnVector arrayColumnVector = (ArrayColumnVector) 
batch.columns[0];
+
+        expectedData.validateOuterArray(arrayColumnVector, getter);
+
+        ArrayColumnVector innerArrayColumnVector =
+                (ArrayColumnVector) arrayColumnVector.getColumnVector();
+        expectedData.validateInnerArray(innerArrayColumnVector, getter);
+
+        ColumnVector columnVector = innerArrayColumnVector.getColumnVector();
+        expectedData.validateInnerChild(columnVector, 
BYTES_COLUMN_VECTOR_STRING_FUNC);
+    }
+
+    @Test
+    public void testMapString() throws IOException {
+        RowType rowType =
+                RowType.builder()
+                        .field("map_string", DataTypes.MAP(DataTypes.INT(), 
DataTypes.STRING()))
+                        .build();
+
+        int numRows = RND.nextInt(5) + 5;
+        ArrayObject expectedData = new ArrayObject();
+        List<InternalRow> rows = new ArrayList<>(numRows);
+        for (int i = 0; i < numRows; i++) {
+            if (RND.nextBoolean()) {
+                expectedData.add(null);
+                rows.add(GenericRow.of((Object) null));
+                continue;
+            }
+
+            int currentSize = RND.nextInt(5);
+            List<String> currentStringArray =
+                    IntStream.range(0, currentSize)
+                            .mapToObj(idx -> randomString())
+                            .collect(Collectors.toList());
+            expectedData.add(currentStringArray);
+            Map<Integer, BinaryString> map = new HashMap<>();
+            for (int idx = 0; idx < currentSize; idx++) {
+                map.put(idx, 
BinaryString.fromString(currentStringArray.get(idx)));
+            }
+            rows.add(GenericRow.of(new GenericMap(map)));
+        }
+
+        VectorizedRecordIterator iterator = 
createVectorizedRecordIterator(rowType, rows);
+        VectorizedColumnBatch batch = iterator.batch();
+        InternalArray.ElementGetter getter = 
InternalArray.createElementGetter(DataTypes.STRING());
+
+        // validate row by row
+        for (int i = 0; i < numRows; i++) {
+            InternalRow row = iterator.next();
+            assertThat(row).isNotNull();
+            List<?> expected = expectedData.data.get(i);
+            if (expected == null) {
+                assertThat(row.isNullAt(0)).isTrue();
+            } else {
+                InternalMap map = row.getMap(0);
+                validateMapKeyArray(map.keyArray());
+                InternalArray valueArray = map.valueArray();
+                expectedData.validateNonNullArray(expected, valueArray, 
getter);
+            }
+        }
+        assertThat(iterator.next()).isNull();
+
+        // validate ColumnVector
+        MapColumnVector mapColumnVector = (MapColumnVector) batch.columns[0];
+        IntColumnVector keyColumnVector = (IntColumnVector) 
mapColumnVector.getKeyColumnVector();
+        validateMapKeyColumnVector(keyColumnVector, expectedData);
+        ColumnVector valueColumnVector = 
mapColumnVector.getValueColumnVector();
+        expectedData.validateInnerChild(valueColumnVector, 
BYTES_COLUMN_VECTOR_STRING_FUNC);
+
+        iterator.releaseBatch();
+    }
+
+    @Test
+    public void testMapArrayString() throws IOException {
+        RowType rowType =
+                RowType.builder()
+                        .field(
+                                "map_array_string",
+                                DataTypes.MAP(DataTypes.INT(), 
DataTypes.ARRAY(DataTypes.STRING())))
+                        .build();
+
+        int numRows = RND.nextInt(5) + 5;
+        ArrayArrayObject expectedData = new ArrayArrayObject();
+        List<InternalRow> rows = new ArrayList<>(numRows);
+        for (int i = 0; i < numRows; i++) {
+            // outer null row
+            if (RND.nextBoolean()) {
+                expectedData.add(null);
+                rows.add(GenericRow.of((Object) null));
+                continue;
+            }
+
+            int mapSize = RND.nextInt(5);
+            ArrayObject arrayObject = new ArrayObject();
+            Map<Integer, GenericArray> map = new HashMap<>();
+            for (int mIdx = 0; mIdx < mapSize; mIdx++) {
+                // null array value
+                if (RND.nextBoolean()) {
+                    arrayObject.add(null);
+                    map.put(mIdx, null);
+                    continue;
+                }
+
+                int currentSize = RND.nextInt(5);
+                List<String> currentStringArray =
+                        IntStream.range(0, currentSize)
+                                .mapToObj(idx -> randomString())
+                                .collect(Collectors.toList());
+                arrayObject.add(currentStringArray);
+
+                map.put(
+                        mIdx,
+                        new GenericArray(
+                                currentStringArray.stream()
+                                        .map(BinaryString::fromString)
+                                        .toArray()));
+            }
+            expectedData.add(arrayObject);
+            rows.add(GenericRow.of(new GenericMap(map)));
+        }
+
+        VectorizedRecordIterator iterator = 
createVectorizedRecordIterator(rowType, rows);
+        VectorizedColumnBatch batch = iterator.batch();
+        InternalArray.ElementGetter getter = 
InternalArray.createElementGetter(DataTypes.STRING());
+
+        // validate row by row
+        for (int i = 0; i < numRows; i++) {
+            InternalRow row = iterator.next();
+            assertThat(row).isNotNull();
+            ArrayObject expected = expectedData.data.get(i);
+            if (expected == null) {
+                assertThat(row.isNullAt(0)).isTrue();
+            } else {
+                InternalMap map = row.getMap(0);
+                validateMapKeyArray(map.keyArray());
+                InternalArray valueArray = map.valueArray();
+                expected.validateArrayGetter(valueArray, getter);
+            }
+        }
+        assertThat(iterator.next()).isNull();
+
+        // validate column vector
+        MapColumnVector mapColumnVector = (MapColumnVector) batch.columns[0];
+        IntColumnVector keyColumnVector = (IntColumnVector) 
mapColumnVector.getKeyColumnVector();
+        validateMapKeyColumnVector(keyColumnVector, expectedData);
+
+        ArrayColumnVector valueColumnVector =
+                (ArrayColumnVector) mapColumnVector.getValueColumnVector();
+        expectedData.validateInnerArray(valueColumnVector, getter);
+        expectedData.validateInnerChild(
+                valueColumnVector.getColumnVector(), 
BYTES_COLUMN_VECTOR_STRING_FUNC);
+
+        iterator.releaseBatch();
+    }
+
+    private void validateMapKeyArray(InternalArray keyArray) {
+        for (int i = 0; i < keyArray.size(); i++) {
+            assertThat(keyArray.getInt(i)).isEqualTo(i);
+        }
+    }
+
+    private void validateMapKeyColumnVector(
+            IntColumnVector columnVector, ArrayObject expectedData) {
+        int idx = 0;
+        for (List<?> values : expectedData.data) {
+            if (values != null) {
+                for (int i = 0; i < values.size(); i++) {
+                    assertThat(columnVector.getInt(idx++)).isEqualTo(i);
+                }
+            }
+        }
+    }
+
+    private void validateMapKeyColumnVector(
+            IntColumnVector columnVector, ArrayArrayObject expectedData) {
+        int idx = 0;
+        for (ArrayObject arrayObject : expectedData.data) {
+            if (arrayObject != null) {
+                for (int i = 0; i < arrayObject.data.size(); i++) {
+                    assertThat(columnVector.getInt(idx++)).isEqualTo(i);
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testRow() throws IOException {
+        RowType rowType =
+                RowType.builder()
+                        .field(
+                                "row",
+                                RowType.builder()
+                                        .field("f0", DataTypes.INT())
+                                        .field("f1", 
DataTypes.ARRAY(DataTypes.STRING()))
+                                        .build())
+                        .build();
+
+        int numRows = RND.nextInt(5) + 5;
+        ArrayObject expectedData = new ArrayObject();
+        List<InternalRow> rows = new ArrayList<>(numRows);
+        List<Integer> f0 = new ArrayList<>();
+        for (int i = 0; i < numRows; i++) {
+            if (RND.nextBoolean()) {
+                expectedData.add(null);
+                f0.add(null);
+                rows.add(GenericRow.of((Object) null));
+                continue;
+            }
+
+            if (RND.nextInt(5) == 0) {
+                // set f1 null
+                expectedData.add(null);
+                f0.add(i);
+                rows.add(GenericRow.of(GenericRow.of(i, null)));
+                continue;
+            }
+
+            int currentSize = RND.nextInt(5);
+            List<String> currentStringArray =
+                    IntStream.range(0, currentSize)
+                            .mapToObj(idx -> randomString())
+                            .collect(Collectors.toList());
+            expectedData.add(currentStringArray);
+            f0.add(i);
+            GenericArray array =
+                    new GenericArray(
+                            
currentStringArray.stream().map(BinaryString::fromString).toArray());
+            rows.add(GenericRow.of(GenericRow.of(i, array)));
+        }
+
+        VectorizedRecordIterator iterator = 
createVectorizedRecordIterator(rowType, rows);
+        VectorizedColumnBatch batch = iterator.batch();
+        InternalArray.ElementGetter getter = 
InternalArray.createElementGetter(DataTypes.STRING());
+
+        // validate row by row
+        for (int i = 0; i < numRows; i++) {
+            InternalRow row = iterator.next();
+            assertThat(row).isNotNull();
+            if (f0.get(i) == null && expectedData.data.get(i) == null) {
+                assertThat(row.isNullAt(0)).isTrue();
+            } else {
+                InternalRow innerRow = row.getRow(0, 2);
+
+                if (f0.get(i) == null) {
+                    assertThat(innerRow.isNullAt(0)).isTrue();
+                } else {
+                    assertThat(innerRow.getInt(0)).isEqualTo(f0.get(i));
+                }
+
+                if (expectedData.data.get(i) == null) {
+                    assertThat(innerRow.isNullAt(1)).isTrue();
+                } else {
+                    expectedData.validateNonNullArray(
+                            expectedData.data.get(i), innerRow.getArray(1), 
getter);
+                }
+            }
+        }
+        assertThat(iterator.next()).isNull();
+
+        // validate ColumnVector
+        RowColumnVector rowColumnVector = (RowColumnVector) batch.columns[0];
+        VectorizedColumnBatch innerBatch = rowColumnVector.getBatch();
+
+        IntColumnVector intColumnVector = (IntColumnVector) 
innerBatch.columns[0];
+        for (int i = 0; i < numRows; i++) {
+            Integer f0Value = f0.get(i);
+            if (f0Value == null) {
+                assertThat(intColumnVector.isNullAt(i)).isTrue();
+            } else {
+                assertThat(intColumnVector.getInt(i)).isEqualTo(f0Value);
+            }
+        }
+
+        ArrayColumnVector arrayColumnVector = (ArrayColumnVector) 
innerBatch.columns[1];
+        expectedData.validateColumnVector(arrayColumnVector, getter);
+        expectedData.validateInnerChild(
+                arrayColumnVector.getColumnVector(), 
BYTES_COLUMN_VECTOR_STRING_FUNC);
+
+        iterator.releaseBatch();
+    }
+
+    @Test
+    public void testArrayRowArray() throws IOException {
+        RowType rowType =
+                RowType.builder()
+                        .field(
+                                "array_row_array",
+                                DataTypes.ARRAY(
+                                        RowType.builder()
+                                                .field("f0", 
DataTypes.STRING())
+                                                .field("f1", 
DataTypes.ARRAY(DataTypes.INT()))
+                                                .build()))
+                        .build();
+
+        List<InternalRow> rows = new ArrayList<>(4);
+        List<BinaryString> f0 = new ArrayList<>(3);
+        for (int i = 0; i < 3; i++) {
+            f0.add(BinaryString.fromString(randomString()));
+        }
+
+        GenericRow row00 = GenericRow.of(f0.get(0), new GenericArray(new 
Object[] {0, null}));
+        GenericRow row01 = GenericRow.of(f0.get(1), new GenericArray(new 
Object[] {}));
+        GenericArray array0 = new GenericArray(new GenericRow[] {row00, 
row01});
+        rows.add(GenericRow.of(array0));
+
+        rows.add(GenericRow.of((Object) null));
+
+        GenericRow row20 = GenericRow.of(f0.get(2), new GenericArray(new 
Object[] {1}));
+        GenericArray array2 = new GenericArray(new GenericRow[] {row20});
+        rows.add(GenericRow.of(array2));
+
+        GenericArray array3 = new GenericArray(new GenericRow[] {});
+        rows.add(GenericRow.of(array3));
+
+        VectorizedRecordIterator iterator = 
createVectorizedRecordIterator(rowType, rows);
+        VectorizedColumnBatch batch = iterator.batch();
+
+        // validate row by row
+        InternalRow row0 = iterator.next();
+        // array0
+        InternalArray array = row0.getArray(0);
+        assertThat(array.size()).isEqualTo(2);
+        // row00
+        InternalRow row = array.getRow(0, 1);
+        if (f0.get(0) == null) {
+            assertThat(row.isNullAt(0)).isTrue();
+        } else {
+            assertThat(row.getString(0)).isEqualTo(f0.get(0));
+        }
+        InternalArray innerArray = row.getArray(1);
+        assertThat(innerArray.size()).isEqualTo(2);
+        assertThat(innerArray.getInt(0)).isEqualTo(0);
+        assertThat(innerArray.isNullAt(1)).isTrue();
+        // row01
+        row = array.getRow(1, 1);
+        if (f0.get(1) == null) {
+            assertThat(row.isNullAt(0)).isTrue();
+        } else {
+            assertThat(row.getString(0)).isEqualTo(f0.get(1));
+        }
+        innerArray = row.getArray(1);
+        assertThat(innerArray.size()).isEqualTo(0);
+
+        InternalRow row1 = iterator.next();
+        assertThat(row1.isNullAt(0)).isTrue();
+
+        InternalRow row2 = iterator.next();
+        // array2
+        array = row2.getArray(0);
+        assertThat(array.size()).isEqualTo(1);
+        // row20
+        row = array.getRow(0, 1);
+        if (f0.get(2) == null) {
+            assertThat(row.isNullAt(0)).isTrue();
+        } else {
+            assertThat(row.getString(0)).isEqualTo(f0.get(2));
+        }
+        innerArray = row.getArray(1);
+        assertThat(innerArray.size()).isEqualTo(1);
+        assertThat(innerArray.getInt(0)).isEqualTo(1);
+
+        InternalRow row3 = iterator.next();
+        // array2
+        array = row3.getArray(0);
+        assertThat(array.size()).isEqualTo(0);
+
+        assertThat(iterator.next()).isNull();
+
+        // validate ColumnVector
+        ArrayColumnVector arrayColumnVector = (ArrayColumnVector) 
batch.columns[0];
+        assertThat(arrayColumnVector.isNullAt(0)).isFalse();
+        assertThat(arrayColumnVector.isNullAt(1)).isTrue();
+        assertThat(arrayColumnVector.isNullAt(2)).isFalse();
+        assertThat(arrayColumnVector.isNullAt(3)).isFalse();
+
+        RowColumnVector rowColumnVector = (RowColumnVector) 
arrayColumnVector.getColumnVector();
+        BytesColumnVector f0Vector = (BytesColumnVector) 
rowColumnVector.getBatch().columns[0];
+        for (int i = 0; i < 3; i++) {
+            BinaryString s = f0.get(i);
+            if (s == null) {
+                assertThat(f0Vector.isNullAt(i)).isTrue();
+            } else {
+                assertThat(new 
String(f0Vector.getBytes(i).getBytes())).isEqualTo(s.toString());
+            }
+        }
+        ArrayColumnVector f1Vector = (ArrayColumnVector) 
rowColumnVector.getBatch().columns[1];
+        InternalArray internalArray0 = f1Vector.getArray(0);
+        assertThat(internalArray0.size()).isEqualTo(2);
+        assertThat(internalArray0.isNullAt(0)).isFalse();
+        assertThat(internalArray0.isNullAt(1)).isTrue();
+
+        InternalArray internalArray1 = f1Vector.getArray(1);
+        assertThat(internalArray1.size()).isEqualTo(0);
+
+        InternalArray internalArray2 = f1Vector.getArray(2);
+        assertThat(internalArray2.size()).isEqualTo(1);
+        assertThat(internalArray2.isNullAt(0)).isFalse();
+
+        IntColumnVector intColumnVector = (IntColumnVector) 
f1Vector.getColumnVector();
+        assertThat(intColumnVector.getInt(0)).isEqualTo(0);
+        assertThat(intColumnVector.isNullAt(1)).isTrue();
+        assertThat(intColumnVector.getInt(2)).isEqualTo(1);
+
+        iterator.releaseBatch();
+    }
+
+    private VectorizedRecordIterator createVectorizedRecordIterator(
+            RowType rowType, List<InternalRow> rows) throws IOException {
+        Path path = new Path(tempDir.toString(), UUID.randomUUID().toString());
+        LocalFileIO fileIO = LocalFileIO.create();
+
+        ParquetWriterFactory writerFactory =
+                new ParquetWriterFactory(new RowDataParquetBuilder(rowType, 
new Options()));
+        FormatWriter writer = 
writerFactory.create(fileIO.newOutputStream(path, false), "zstd");
+        for (InternalRow row : rows) {
+            writer.addElement(row);
+        }
+        writer.flush();
+        writer.finish();
+
+        ParquetReaderFactory readerFactory =
+                new ParquetReaderFactory(new Options(), rowType, 1024, 
FilterCompat.NOOP);
+
+        RecordReader<InternalRow> reader =
+                readerFactory.createReader(
+                        new FormatReaderContext(fileIO, path, 
fileIO.getFileSize(path)));
+
+        RecordReader.RecordIterator<InternalRow> iterator = reader.readBatch();
+        return (VectorizedRecordIterator) iterator;
+    }
+
+    @Nullable
+    private String randomString() {
+        return RND.nextInt(5) == 0 ? null : StringUtils.getRandomString(RND, 
1, 10);
+    }
+
+    /** Store generated data of ARRAY[STRING] and provide validated methods. */
+    private static class ArrayObject {
+
+        public final List<List<?>> data;
+
+        public ArrayObject() {
+            this.data = new ArrayList<>();
+        }
+
+        public void add(List<?> objects) {
+            data.add(objects);
+        }
+
+        public void validateRow(InternalRow row, int i, 
InternalArray.ElementGetter getter) {
+            assertThat(row).isNotNull();
+            List<?> expected = data.get(i);
+            if (expected == null) {
+                assertThat(row.isNullAt(0)).isTrue();
+            } else {
+                validateNonNullArray(expected, row.getArray(0), getter);
+            }
+        }
+
+        public void validateColumnVector(
+                ArrayColumnVector arrayColumnVector, 
InternalArray.ElementGetter getter) {
+            for (int i = 0; i < data.size(); i++) {
+                List<?> expected = data.get(i);
+                if (expected == null) {
+                    assertThat(arrayColumnVector.isNullAt(i)).isTrue();
+                } else {
+                    validateNonNullArray(expected, 
arrayColumnVector.getArray(i), getter);
+                }
+            }
+        }
+
+        public void validateArrayGetter(DataGetters arrays, 
InternalArray.ElementGetter getter) {
+            for (int i = 0; i < data.size(); i++) {
+                List<?> expected = data.get(i);
+                if (expected == null) {
+                    assertThat(arrays.isNullAt(i)).isTrue();
+                } else {
+                    validateNonNullArray(expected, arrays.getArray(i), getter);
+                }
+            }
+        }
+
+        public void validateNonNullArray(
+                List<?> expected, InternalArray array, 
InternalArray.ElementGetter getter) {
+            int arraySize = array.size();
+            assertThat(arraySize).isEqualTo(expected.size());
+            for (int i = 0; i < arraySize; i++) {
+                String value = String.valueOf(getter.getElementOrNull(array, 
i));
+                assertThat(value).isEqualTo(String.valueOf(expected.get(i)));
+            }
+        }
+
+        public void validateInnerChild(
+                ColumnVector columnVector, BiFunction<ColumnVector, Integer, 
String> stringGetter) {
+            // it doesn't contain null rows
+            List<?> expandedData =
+                    data.stream()
+                            .filter(Objects::nonNull)
+                            .flatMap(Collection::stream)
+                            .collect(Collectors.toList());
+            for (int i = 0; i < expandedData.size(); i++) {
+                assertThat(stringGetter.apply(columnVector, i))
+                        .isEqualTo(String.valueOf(expandedData.get(i)));
+            }
+        }
+    }
+
+    /** Store generated data of ARRAY[ARRAY[STRING]] and provide validated 
methods. */
+    private static class ArrayArrayObject {
+
+        public final List<ArrayObject> data;
+
+        public ArrayArrayObject() {
+            this.data = new ArrayList<>();
+        }
+
+        public void add(@Nullable ArrayObject arrayObjects) {
+            data.add(arrayObjects);
+        }
+
+        private List<List<?>> expand() {
+            // it doesn't contain null rows of outer array
+            return data.stream()
+                    .filter(Objects::nonNull)
+                    .flatMap(i -> i.data.stream())
+                    .collect(Collectors.toList());
+        }
+
+        private List<?> expandInner() {
+            // it doesn't contain null rows of outer and inner array
+            return expand().stream()
+                    .filter(Objects::nonNull)
+                    .flatMap(Collection::stream)
+                    .collect(Collectors.toList());
+        }
+
+        public void validateRow(InternalRow row, int i, 
InternalArray.ElementGetter getter) {
+            assertThat(row).isNotNull();
+            ArrayObject expectedArray = data.get(i);
+            if (expectedArray == null) {
+                assertThat(row.isNullAt(0)).isTrue();
+            } else {
+                InternalArray outerArray = row.getArray(0);
+                
assertThat(outerArray.size()).isEqualTo(expectedArray.data.size());
+                expectedArray.validateArrayGetter(outerArray, getter);
+            }
+        }
+
+        public void validateOuterArray(
+                ArrayColumnVector arrayColumnVector,
+                InternalArray.ElementGetter innerElementGetter) {
+            for (int i = 0; i < data.size(); i++) {
+                ArrayObject expected = data.get(i);
+                if (expected == null) {
+                    assertThat(arrayColumnVector.isNullAt(i)).isTrue();
+                } else {
+                    InternalArray array = arrayColumnVector.getArray(i);
+                    expected.validateArrayGetter(array, innerElementGetter);
+                }
+            }
+        }
+
+        public void validateInnerArray(
+                ArrayColumnVector arrayColumnVector,
+                InternalArray.ElementGetter innerElementGetter) {
+            List<List<?>> expandedData = expand();
+            for (int i = 0; i < expandedData.size(); i++) {
+                List<?> expected = expandedData.get(i);
+                if (expected == null) {
+                    assertThat(arrayColumnVector.isNullAt(i)).isTrue();
+                } else {
+                    InternalArray array = arrayColumnVector.getArray(i);
+                    int size = array.size();
+                    assertThat(size).isEqualTo(expected.size());
+                    for (int j = 0; j < size; j++) {
+                        
assertThat(String.valueOf(innerElementGetter.getElementOrNull(array, j)))
+                                .isEqualTo(String.valueOf(expected.get(j)));
+                    }
+                }
+            }
+        }
+
+        public void validateInnerChild(
+                ColumnVector columnVector, BiFunction<ColumnVector, Integer, 
String> stringGetter) {
+            List<?> expandedData = expandInner();
+            for (int i = 0; i < expandedData.size(); i++) {
+                assertThat(stringGetter.apply(columnVector, i))
+                        .isEqualTo(String.valueOf(expandedData.get(i)));
+            }
+        }
+    }
+}
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
index a3aa1f85b..57fd0235b 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
@@ -34,6 +34,7 @@ import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.reader.RecordReaderIterator;
 import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.BooleanType;
@@ -454,9 +455,7 @@ public class ParquetReadWriteTest {
                 format.createReader(
                         new FormatReaderContext(
                                 new LocalFileIO(), path, new 
LocalFileIO().getFileSize(path)));
-        List<InternalRow> results = new ArrayList<>(1283);
-        reader.forEachRemaining(results::add);
-        compareNestedRow(rows, results);
+        compareNestedRow(rows, new RecordReaderIterator<>(reader));
     }
 
     @Test
@@ -525,41 +524,9 @@ public class ParquetReadWriteTest {
                     }
                     Integer v = expected.get(cnt.get());
                     if (v == null) {
-                        assertThat(row.isNullAt(0)).isTrue();
-                        assertThat(row.isNullAt(1)).isTrue();
-                        assertThat(row.isNullAt(2)).isTrue();
-                        assertThat(row.isNullAt(3)).isTrue();
-                        assertThat(row.isNullAt(4)).isTrue();
-                        assertThat(row.isNullAt(5)).isTrue();
-                        assertThat(row.isNullAt(6)).isTrue();
-                        assertThat(row.isNullAt(7)).isTrue();
-                        assertThat(row.isNullAt(8)).isTrue();
-                        assertThat(row.isNullAt(9)).isTrue();
-                        assertThat(row.isNullAt(10)).isTrue();
-                        assertThat(row.isNullAt(11)).isTrue();
-                        assertThat(row.isNullAt(12)).isTrue();
-                        assertThat(row.isNullAt(13)).isTrue();
-                        assertThat(row.isNullAt(14)).isTrue();
-                        assertThat(row.isNullAt(15)).isTrue();
-                        assertThat(row.isNullAt(16)).isTrue();
-                        assertThat(row.isNullAt(17)).isTrue();
-                        assertThat(row.isNullAt(18)).isTrue();
-                        assertThat(row.isNullAt(19)).isTrue();
-                        assertThat(row.isNullAt(20)).isTrue();
-                        assertThat(row.isNullAt(21)).isTrue();
-                        assertThat(row.isNullAt(22)).isTrue();
-                        assertThat(row.isNullAt(23)).isTrue();
-                        assertThat(row.isNullAt(24)).isTrue();
-                        assertThat(row.isNullAt(25)).isTrue();
-                        assertThat(row.isNullAt(26)).isTrue();
-                        assertThat(row.isNullAt(27)).isTrue();
-                        assertThat(row.isNullAt(28)).isTrue();
-                        assertThat(row.isNullAt(29)).isTrue();
-                        assertThat(row.isNullAt(30)).isTrue();
-                        assertThat(row.isNullAt(31)).isTrue();
-                        assertThat(row.isNullAt(32)).isTrue();
-                        assertThat(row.isNullAt(33)).isTrue();
-                        assertThat(row.isNullAt(34)).isTrue();
+                        for (int i = 0; i < 35; i++) {
+                            assertThat(row.isNullAt(i)).isTrue();
+                        }
                     } else {
                         assertThat(row.getString(0)).hasToString("" + v);
                         assertThat(row.getBoolean(1)).isEqualTo(v % 2 == 0);
@@ -826,7 +793,6 @@ public class ParquetReadWriteTest {
                 row1.add(0, i);
                 Group row2 = rowList.addGroup(0);
                 row2.add(0, i + 1);
-                f4.addGroup(0);
 
                 // add ROW<`f0` ARRAY<ROW<`b` ARRAY<ARRAY<INT>>, `c` INT>>, 
`f1` INT>>
                 Group f5 = row.addGroup("f5");
@@ -835,7 +801,6 @@ public class ParquetReadWriteTest {
                 Group insideArray = insideRow.addGroup(0);
                 createParquetDoubleNestedArray(insideArray, i);
                 insideRow.add(1, i);
-                arrayRow.addGroup(0);
                 f5.add(1, i);
                 writer.write(row);
             }
@@ -873,12 +838,12 @@ public class ParquetReadWriteTest {
         }
     }
 
-    private void compareNestedRow(List<InternalRow> rows, List<InternalRow> 
results) {
-        Assertions.assertEquals(rows.size(), results.size());
+    private void compareNestedRow(
+            List<InternalRow> rows, RecordReaderIterator<InternalRow> 
iterator) throws Exception {
+        for (InternalRow origin : rows) {
+            assertThat(iterator.hasNext()).isTrue();
+            InternalRow result = iterator.next();
 
-        for (InternalRow result : results) {
-            int index = result.getInt(0);
-            InternalRow origin = rows.get(index);
             Assertions.assertEquals(origin.getInt(0), result.getInt(0));
 
             // int[]
@@ -967,6 +932,8 @@ public class ParquetReadWriteTest {
                     result.getRow(5, 2).getArray(0).getRow(0, 2).getInt(1));
             Assertions.assertEquals(origin.getRow(5, 2).getInt(1), 
result.getRow(5, 2).getInt(1));
         }
+        assertThat(iterator.hasNext()).isFalse();
+        iterator.close();
     }
 
     private void fillWithMap(Map<String, String> map, InternalMap internalMap, 
int index) {

Reply via email to