This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b50040ab8e [Parquet] Revert parquet patch #3883 which tries to 
construct column vectors like orc (#4745)
b50040ab8e is described below

commit b50040ab8ebceef6c15bfe394dfcd15fbe997e17
Author: yuzelin <[email protected]>
AuthorDate: Sat Dec 21 16:39:23 2024 +0800

    [Parquet] Revert parquet patch #3883 which tries to construct column 
vectors like orc (#4745)
---
 .../arrow/converter/ArrowBatchConverterTest.java   |  64 +++--
 .../format/parquet/ParquetReaderFactory.java       |   7 +-
 .../format/parquet/position/RowPosition.java       |  40 +++
 .../format/parquet/reader/NestedColumnReader.java  | 126 ++++------
 .../format/parquet/reader/NestedPositionUtil.java  |  87 +++++--
 .../reader/NestedPrimitiveColumnReader.java        |  70 +-----
 .../format/parquet/reader/RowColumnReader.java     |  59 +++++
 .../format/parquet/ParquetColumnVectorTest.java    | 270 +++++++++++++++------
 .../format/parquet/ParquetReadWriteTest.java       |  32 ++-
 9 files changed, 482 insertions(+), 273 deletions(-)

diff --git 
a/paimon-arrow/src/test/java/org/apache/paimon/arrow/converter/ArrowBatchConverterTest.java
 
b/paimon-arrow/src/test/java/org/apache/paimon/arrow/converter/ArrowBatchConverterTest.java
index aef589d912..96470b72ee 100644
--- 
a/paimon-arrow/src/test/java/org/apache/paimon/arrow/converter/ArrowBatchConverterTest.java
+++ 
b/paimon-arrow/src/test/java/org/apache/paimon/arrow/converter/ArrowBatchConverterTest.java
@@ -167,7 +167,7 @@ public class ArrowBatchConverterTest {
             rows.add(GenericRow.of(randomRowValues));
         }
 
-        return getRecordIterator(PRIMITIVE_TYPE, rows, projection);
+        return getRecordIterator(PRIMITIVE_TYPE, rows, projection, true);
     }
 
     @TestTemplate
@@ -244,7 +244,7 @@ public class ArrowBatchConverterTest {
         }
 
         RecordReader.RecordIterator<InternalRow> iterator =
-                getRecordIterator(nestedArrayType, rows);
+                getRecordIterator(nestedArrayType, rows, null, 
testMode.equals("per_row"));
         try (RootAllocator allocator = new RootAllocator()) {
             VectorSchemaRoot vsr = 
ArrowUtils.createVectorSchemaRoot(nestedArrayType, allocator);
             ArrowBatchConverter arrowWriter = createArrowWriter(iterator, 
nestedArrayType, vsr);
@@ -308,7 +308,8 @@ public class ArrowBatchConverterTest {
             expectedMaps.add(map1);
         }
 
-        RecordReader.RecordIterator<InternalRow> iterator = 
getRecordIterator(nestedMapType, rows);
+        RecordReader.RecordIterator<InternalRow> iterator =
+                getRecordIterator(nestedMapType, rows, null, 
testMode.equals("per_row"));
         try (RootAllocator allocator = new RootAllocator()) {
             VectorSchemaRoot vsr = 
ArrowUtils.createVectorSchemaRoot(nestedMapType, allocator);
             ArrowBatchConverter arrowWriter = createArrowWriter(iterator, 
nestedMapType, vsr);
@@ -365,7 +366,11 @@ public class ArrowBatchConverterTest {
         InternalRow row3 = GenericRow.of(new GenericMap(map3));
 
         RecordReader.RecordIterator<InternalRow> iterator =
-                getRecordIterator(nestedMapRowType, Arrays.asList(row1, row2, 
row3));
+                getRecordIterator(
+                        nestedMapRowType,
+                        Arrays.asList(row1, row2, row3),
+                        null,
+                        testMode.equals("per_row"));
         try (RootAllocator allocator = new RootAllocator()) {
             VectorSchemaRoot vsr = 
ArrowUtils.createVectorSchemaRoot(nestedMapRowType, allocator);
             ArrowBatchConverter arrowWriter = createArrowWriter(iterator, 
nestedMapRowType, vsr);
@@ -423,7 +428,8 @@ public class ArrowBatchConverterTest {
             rows.add(GenericRow.of(GenericRow.of(randomRowValues)));
         }
 
-        RecordReader.RecordIterator<InternalRow> iterator = 
getRecordIterator(nestedRowType, rows);
+        RecordReader.RecordIterator<InternalRow> iterator =
+                getRecordIterator(nestedRowType, rows, null, 
testMode.equals("per_row"));
         try (RootAllocator allocator = new RootAllocator()) {
             VectorSchemaRoot vsr = 
ArrowUtils.createVectorSchemaRoot(nestedRowType, allocator);
             ArrowBatchConverter arrowWriter = createArrowWriter(iterator, 
nestedRowType, vsr);
@@ -464,7 +470,8 @@ public class ArrowBatchConverterTest {
             rows.add(GenericRow.of(i));
         }
 
-        RecordReader.RecordIterator<InternalRow> iterator = 
getRecordIterator(rowType, rows);
+        RecordReader.RecordIterator<InternalRow> iterator =
+                getRecordIterator(rowType, rows, null, true);
         try (RootAllocator allocator = new RootAllocator()) {
             VectorSchemaRoot vsr = ArrowUtils.createVectorSchemaRoot(rowType, 
allocator);
             ArrowBatchConverter arrowWriter = createArrowWriter(iterator, 
rowType, vsr);
@@ -515,7 +522,7 @@ public class ArrowBatchConverterTest {
         int[] projection = readEmpty ? new int[0] : null;
         RecordReader.RecordIterator<InternalRow> iterator =
                 getApplyDeletionFileRecordIterator(
-                        rowType, rows, deleted, 
Collections.singletonList("pk"), projection);
+                        rowType, rows, deleted, 
Collections.singletonList("pk"), projection, true);
         if (readEmpty) {
             testReadEmpty(iterator, numRows - deleted.size());
         } else {
@@ -588,7 +595,12 @@ public class ArrowBatchConverterTest {
         Set<Integer> deleted = getDeletedPks(numRows);
         RecordReader.RecordIterator<InternalRow> iterator =
                 getApplyDeletionFileRecordIterator(
-                        nestedArrayType, rows, deleted, 
Collections.singletonList("pk"), null);
+                        nestedArrayType,
+                        rows,
+                        deleted,
+                        Collections.singletonList("pk"),
+                        null,
+                        testMode.equals("per_row"));
         try (RootAllocator allocator = new RootAllocator()) {
             Set<Integer> expectedPks = getExpectedPks(numRows, deleted);
             VectorSchemaRoot vsr = 
ArrowUtils.createVectorSchemaRoot(nestedArrayType, allocator);
@@ -666,7 +678,12 @@ public class ArrowBatchConverterTest {
         Set<Integer> deleted = getDeletedPks(numRows);
         RecordReader.RecordIterator<InternalRow> iterator =
                 getApplyDeletionFileRecordIterator(
-                        nestedMapType, rows, deleted, 
Collections.singletonList("pk"), null);
+                        nestedMapType,
+                        rows,
+                        deleted,
+                        Collections.singletonList("pk"),
+                        null,
+                        testMode.equals("per_row"));
         try (RootAllocator allocator = new RootAllocator()) {
             Set<Integer> expectedPks = getExpectedPks(numRows, deleted);
             VectorSchemaRoot vsr = 
ArrowUtils.createVectorSchemaRoot(nestedMapType, allocator);
@@ -735,7 +752,12 @@ public class ArrowBatchConverterTest {
         Set<Integer> deleted = getDeletedPks(numRows);
         RecordReader.RecordIterator<InternalRow> iterator =
                 getApplyDeletionFileRecordIterator(
-                        nestedRowType, rows, deleted, 
Collections.singletonList("pk"), null);
+                        nestedRowType,
+                        rows,
+                        deleted,
+                        Collections.singletonList("pk"),
+                        null,
+                        testMode.equals("per_row"));
         try (RootAllocator allocator = new RootAllocator()) {
             Set<Integer> expectedPks = getExpectedPks(numRows, deleted);
             VectorSchemaRoot vsr = 
ArrowUtils.createVectorSchemaRoot(nestedRowType, allocator);
@@ -803,14 +825,15 @@ public class ArrowBatchConverterTest {
     }
 
     private RecordReader.RecordIterator<InternalRow> getRecordIterator(
-            RowType rowType, List<InternalRow> rows) throws Exception {
-        return getRecordIterator(rowType, rows, null);
-    }
-
-    private RecordReader.RecordIterator<InternalRow> getRecordIterator(
-            RowType rowType, List<InternalRow> rows, @Nullable int[] 
projection) throws Exception {
+            RowType rowType,
+            List<InternalRow> rows,
+            @Nullable int[] projection,
+            boolean canTestParquet)
+            throws Exception {
         Map<String, String> options = new HashMap<>();
-        options.put(CoreOptions.FILE_FORMAT.key(), RND.nextBoolean() ? "orc" : 
"parquet");
+        options.put(
+                CoreOptions.FILE_FORMAT.key(),
+                canTestParquet && RND.nextBoolean() ? "parquet" : "orc");
         FileStoreTable table = createFileStoreTable(rowType, 
Collections.emptyList(), options);
 
         StreamTableWrite write = table.newStreamWriteBuilder().newWrite();
@@ -832,12 +855,15 @@ public class ArrowBatchConverterTest {
             List<GenericRow> rows,
             Set<Integer> deletedPks,
             List<String> primaryKeys,
-            @Nullable int[] projection)
+            @Nullable int[] projection,
+            boolean canTestParquet)
             throws Exception {
         Map<String, String> options = new HashMap<>();
         options.put(CoreOptions.DELETION_VECTORS_ENABLED.key(), "true");
         options.put(CoreOptions.BUCKET.key(), "1");
-        options.put(CoreOptions.FILE_FORMAT.key(), RND.nextBoolean() ? "orc" : 
"parquet");
+        options.put(
+                CoreOptions.FILE_FORMAT.key(),
+                canTestParquet && RND.nextBoolean() ? "parquet" : "orc");
         FileStoreTable table = createFileStoreTable(rowType, primaryKeys, 
options);
 
         StreamTableWrite write = table.newStreamWriteBuilder().newWrite();
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
index 6f8cab2202..910f3031e0 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
@@ -383,13 +383,14 @@ public class ParquetReaderFactory implements 
FormatReaderFactory {
 
         /** Advances to the next batch of rows. Returns false if there are no 
more. */
         private boolean nextBatch(ParquetReaderBatch batch) throws IOException 
{
+            if (rowsReturned >= totalRowCount) {
+                return false;
+            }
+
             for (WritableColumnVector v : batch.writableVectors) {
                 v.reset();
             }
             batch.columnarBatch.setNumRows(0);
-            if (rowsReturned >= totalRowCount) {
-                return false;
-            }
             if (rowsReturned == totalCountLoadedSoFar) {
                 readNextRowGroup();
             } else {
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/position/RowPosition.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/position/RowPosition.java
new file mode 100644
index 0000000000..fb63783490
--- /dev/null
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/position/RowPosition.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.parquet.position;
+
+import javax.annotation.Nullable;
+
+/** To represent struct's position in repeated type. */
+public class RowPosition {
+    @Nullable private final boolean[] isNull;
+    private final int positionsCount;
+
+    public RowPosition(boolean[] isNull, int positionsCount) {
+        this.isNull = isNull;
+        this.positionsCount = positionsCount;
+    }
+
+    public boolean[] getIsNull() {
+        return isNull;
+    }
+
+    public int getPositionsCount() {
+        return positionsCount;
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java
index 8f20be2754..3724014e62 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedColumnReader.java
@@ -27,6 +27,7 @@ import org.apache.paimon.data.columnar.heap.HeapRowVector;
 import org.apache.paimon.data.columnar.writable.WritableColumnVector;
 import org.apache.paimon.format.parquet.position.CollectionPosition;
 import org.apache.paimon.format.parquet.position.LevelDelegation;
+import org.apache.paimon.format.parquet.position.RowPosition;
 import org.apache.paimon.format.parquet.type.ParquetField;
 import org.apache.paimon.format.parquet.type.ParquetGroupField;
 import org.apache.paimon.format.parquet.type.ParquetPrimitiveField;
@@ -87,26 +88,20 @@ public class NestedColumnReader implements 
ColumnReader<WritableColumnVector> {
 
     @Override
     public void readToVector(int readNumber, WritableColumnVector vector) 
throws IOException {
-        readData(field, readNumber, vector, false, false, false);
+        readData(field, readNumber, vector, false);
     }
 
     private Pair<LevelDelegation, WritableColumnVector> readData(
-            ParquetField field,
-            int readNumber,
-            ColumnVector vector,
-            boolean inside,
-            boolean readRowField,
-            boolean readMapKey)
+            ParquetField field, int readNumber, ColumnVector vector, boolean 
inside)
             throws IOException {
         if (field.getType() instanceof RowType) {
             return readRow((ParquetGroupField) field, readNumber, vector, 
inside);
         } else if (field.getType() instanceof MapType || field.getType() 
instanceof MultisetType) {
-            return readMap((ParquetGroupField) field, readNumber, vector, 
inside, readRowField);
+            return readMap((ParquetGroupField) field, readNumber, vector, 
inside);
         } else if (field.getType() instanceof ArrayType) {
-            return readArray((ParquetGroupField) field, readNumber, vector, 
inside, readRowField);
+            return readArray((ParquetGroupField) field, readNumber, vector, 
inside);
         } else {
-            return readPrimitive(
-                    (ParquetPrimitiveField) field, readNumber, vector, 
readRowField, readMapKey);
+            return readPrimitive((ParquetPrimitiveField) field, readNumber, 
vector);
         }
     }
 
@@ -114,60 +109,64 @@ public class NestedColumnReader implements 
ColumnReader<WritableColumnVector> {
             ParquetGroupField field, int readNumber, ColumnVector vector, 
boolean inside)
             throws IOException {
         HeapRowVector heapRowVector = (HeapRowVector) vector;
-        LevelDelegation longest = null;
+        LevelDelegation levelDelegation = null;
         List<ParquetField> children = field.getChildren();
         WritableColumnVector[] childrenVectors = heapRowVector.getFields();
         WritableColumnVector[] finalChildrenVectors =
                 new WritableColumnVector[childrenVectors.length];
+
+        int len = -1;
+        boolean[] isNull = null;
+        boolean hasNull = false;
+
         for (int i = 0; i < children.size(); i++) {
             Pair<LevelDelegation, WritableColumnVector> tuple =
-                    readData(children.get(i), readNumber, childrenVectors[i], 
true, true, false);
-            LevelDelegation current = tuple.getLeft();
-            if (longest == null) {
-                longest = current;
-            } else if (current.getDefinitionLevel().length > 
longest.getDefinitionLevel().length) {
-                longest = current;
-            }
+                    readData(children.get(i), readNumber, childrenVectors[i], 
true);
+            levelDelegation = tuple.getLeft();
             finalChildrenVectors[i] = tuple.getRight();
+
+            WritableColumnVector writableColumnVector = tuple.getRight();
+            if (len == -1) {
+                len = ((ElementCountable) writableColumnVector).getLen();
+                isNull = new boolean[len];
+                Arrays.fill(isNull, true);
+            }
+
+            for (int j = 0; j < len; j++) {
+                isNull[j] = isNull[j] && writableColumnVector.isNullAt(j);
+                if (isNull[j]) {
+                    hasNull = true;
+                }
+            }
         }
-        if (longest == null) {
+        if (levelDelegation == null) {
             throw new RuntimeException(
                     String.format("Row field does not have any children: %s.", 
field));
         }
 
-        int len = ((ElementCountable) finalChildrenVectors[0]).getLen();
-        boolean[] isNull = new boolean[len];
-        Arrays.fill(isNull, true);
-        boolean hasNull = false;
-        for (int i = 0; i < len; i++) {
-            for (WritableColumnVector child : finalChildrenVectors) {
-                isNull[i] = isNull[i] && child.isNullAt(i);
-            }
-            if (isNull[i]) {
-                hasNull = true;
-            }
-        }
+        RowPosition rowPosition =
+                NestedPositionUtil.calculateRowOffsets(
+                        field,
+                        levelDelegation.getDefinitionLevel(),
+                        levelDelegation.getRepetitionLevel());
 
         // If row was inside the structure, then we need to renew the vector 
to reset the
         // capacity.
         if (inside) {
-            heapRowVector = new HeapRowVector(len, finalChildrenVectors);
+            heapRowVector =
+                    new HeapRowVector(rowPosition.getPositionsCount(), 
finalChildrenVectors);
         } else {
             heapRowVector.setFields(finalChildrenVectors);
         }
 
         if (hasNull) {
-            setFieldNullFlag(isNull, heapRowVector);
+            setFieldNullFalg(isNull, heapRowVector);
         }
-        return Pair.of(longest, heapRowVector);
+        return Pair.of(levelDelegation, heapRowVector);
     }
 
     private Pair<LevelDelegation, WritableColumnVector> readMap(
-            ParquetGroupField field,
-            int readNumber,
-            ColumnVector vector,
-            boolean inside,
-            boolean readRowField)
+            ParquetGroupField field, int readNumber, ColumnVector vector, 
boolean inside)
             throws IOException {
         HeapMapVector mapVector = (HeapMapVector) vector;
         mapVector.reset();
@@ -177,21 +176,9 @@ public class NestedColumnReader implements 
ColumnReader<WritableColumnVector> {
                 "Maps must have two type parameters, found %s",
                 children.size());
         Pair<LevelDelegation, WritableColumnVector> keyTuple =
-                readData(
-                        children.get(0),
-                        readNumber,
-                        mapVector.getKeyColumnVector(),
-                        true,
-                        false,
-                        true);
+                readData(children.get(0), readNumber, 
mapVector.getKeyColumnVector(), true);
         Pair<LevelDelegation, WritableColumnVector> valueTuple =
-                readData(
-                        children.get(1),
-                        readNumber,
-                        mapVector.getValueColumnVector(),
-                        true,
-                        false,
-                        false);
+                readData(children.get(1), readNumber, 
mapVector.getValueColumnVector(), true);
 
         LevelDelegation levelDelegation = keyTuple.getLeft();
 
@@ -199,8 +186,7 @@ public class NestedColumnReader implements 
ColumnReader<WritableColumnVector> {
                 NestedPositionUtil.calculateCollectionOffsets(
                         field,
                         levelDelegation.getDefinitionLevel(),
-                        levelDelegation.getRepetitionLevel(),
-                        readRowField);
+                        levelDelegation.getRepetitionLevel());
 
         // If map was inside the structure, then we need to renew the vector 
to reset the
         // capacity.
@@ -216,7 +202,7 @@ public class NestedColumnReader implements 
ColumnReader<WritableColumnVector> {
         }
 
         if (collectionPosition.getIsNull() != null) {
-            setFieldNullFlag(collectionPosition.getIsNull(), mapVector);
+            setFieldNullFalg(collectionPosition.getIsNull(), mapVector);
         }
 
         mapVector.setLengths(collectionPosition.getLength());
@@ -226,11 +212,7 @@ public class NestedColumnReader implements 
ColumnReader<WritableColumnVector> {
     }
 
     private Pair<LevelDelegation, WritableColumnVector> readArray(
-            ParquetGroupField field,
-            int readNumber,
-            ColumnVector vector,
-            boolean inside,
-            boolean readRowField)
+            ParquetGroupField field, int readNumber, ColumnVector vector, 
boolean inside)
             throws IOException {
         HeapArrayVector arrayVector = (HeapArrayVector) vector;
         arrayVector.reset();
@@ -240,15 +222,14 @@ public class NestedColumnReader implements 
ColumnReader<WritableColumnVector> {
                 "Arrays must have a single type parameter, found %s",
                 children.size());
         Pair<LevelDelegation, WritableColumnVector> tuple =
-                readData(children.get(0), readNumber, arrayVector.getChild(), 
true, false, false);
+                readData(children.get(0), readNumber, arrayVector.getChild(), 
true);
 
         LevelDelegation levelDelegation = tuple.getLeft();
         CollectionPosition collectionPosition =
                 NestedPositionUtil.calculateCollectionOffsets(
                         field,
                         levelDelegation.getDefinitionLevel(),
-                        levelDelegation.getRepetitionLevel(),
-                        readRowField);
+                        levelDelegation.getRepetitionLevel());
 
         // If array was inside the structure, then we need to renew the vector 
to reset the
         // capacity.
@@ -259,7 +240,7 @@ public class NestedColumnReader implements 
ColumnReader<WritableColumnVector> {
         }
 
         if (collectionPosition.getIsNull() != null) {
-            setFieldNullFlag(collectionPosition.getIsNull(), arrayVector);
+            setFieldNullFalg(collectionPosition.getIsNull(), arrayVector);
         }
         arrayVector.setLengths(collectionPosition.getLength());
         arrayVector.setOffsets(collectionPosition.getOffsets());
@@ -267,12 +248,7 @@ public class NestedColumnReader implements 
ColumnReader<WritableColumnVector> {
     }
 
     private Pair<LevelDelegation, WritableColumnVector> readPrimitive(
-            ParquetPrimitiveField field,
-            int readNumber,
-            ColumnVector vector,
-            boolean readRowField,
-            boolean readMapKey)
-            throws IOException {
+            ParquetPrimitiveField field, int readNumber, ColumnVector vector) 
throws IOException {
         ColumnDescriptor descriptor = field.getDescriptor();
         NestedPrimitiveColumnReader reader = columnReaders.get(descriptor);
         if (reader == null) {
@@ -282,9 +258,7 @@ public class NestedColumnReader implements 
ColumnReader<WritableColumnVector> {
                             pages,
                             isUtcTimestamp,
                             descriptor.getPrimitiveType(),
-                            field.getType(),
-                            readRowField,
-                            readMapKey);
+                            field.getType());
             columnReaders.put(descriptor, reader);
         }
         WritableColumnVector writableColumnVector =
@@ -292,7 +266,7 @@ public class NestedColumnReader implements 
ColumnReader<WritableColumnVector> {
         return Pair.of(reader.getLevelDelegation(), writableColumnVector);
     }
 
-    private static void setFieldNullFlag(boolean[] nullFlags, 
AbstractHeapVector vector) {
+    private static void setFieldNullFalg(boolean[] nullFlags, 
AbstractHeapVector vector) {
         for (int index = 0; index < vector.getLen() && index < 
nullFlags.length; index++) {
             if (nullFlags[index]) {
                 vector.setNullAt(index);
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPositionUtil.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPositionUtil.java
index b43169a40b..99892c8437 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPositionUtil.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPositionUtil.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.format.parquet.reader;
 
 import org.apache.paimon.format.parquet.position.CollectionPosition;
+import org.apache.paimon.format.parquet.position.RowPosition;
 import org.apache.paimon.format.parquet.type.ParquetField;
 import org.apache.paimon.utils.BooleanArrayList;
 import org.apache.paimon.utils.LongArrayList;
@@ -28,6 +29,53 @@ import static java.lang.String.format;
 /** Utils to calculate nested type position. */
 public class NestedPositionUtil {
 
+    /**
+     * Calculate row offsets according to column's max repetition level, 
definition level, value's
+     * repetition level and definition level. Each row has three situation:
+     * <li>Row is not defined,because it's optional parent fields is null, 
this is decided by its
+     *     parent's repetition level
+     * <li>Row is null
+     * <li>Row is defined and not empty.
+     *
+     * @param field field that contains the row column message include max 
repetition level and
+     *     definition level.
+     * @param fieldRepetitionLevels int array with each value's repetition 
level.
+     * @param fieldDefinitionLevels int array with each value's definition 
level.
+     * @return {@link RowPosition} contains collections row count and isNull 
array.
+     */
+    public static RowPosition calculateRowOffsets(
+            ParquetField field, int[] fieldDefinitionLevels, int[] 
fieldRepetitionLevels) {
+        int rowDefinitionLevel = field.getDefinitionLevel();
+        int rowRepetitionLevel = field.getRepetitionLevel();
+        int nullValuesCount = 0;
+        BooleanArrayList nullRowFlags = new BooleanArrayList(0);
+        for (int i = 0; i < fieldDefinitionLevels.length; i++) {
+            // TODO: this is not correct ?
+            //            if (fieldRepetitionLevels[i] > rowRepetitionLevel) {
+            //                throw new IllegalStateException(
+            //                        format(
+            //                                "In parquet's row type field 
repetition level should
+            // not larger than row's repetition level. "
+            //                                        + "Row repetition level 
is %s, row field
+            // repetition level is %s.",
+            //                                rowRepetitionLevel, 
fieldRepetitionLevels[i]));
+            //            }
+
+            if (fieldDefinitionLevels[i] >= rowDefinitionLevel) {
+                // current row is defined and not empty
+                nullRowFlags.add(false);
+            } else {
+                // current row is null
+                nullRowFlags.add(true);
+                nullValuesCount++;
+            }
+        }
+        if (nullValuesCount == 0) {
+            return new RowPosition(null, fieldDefinitionLevels.length);
+        }
+        return new RowPosition(nullRowFlags.toArray(), nullRowFlags.size());
+    }
+
     /**
      * Calculate the collection's offsets according to column's max repetition 
level, definition
      * level, value's repetition level and definition level. Each collection 
(Array or Map) has four
@@ -47,10 +95,7 @@ public class NestedPositionUtil {
      *     array.
      */
     public static CollectionPosition calculateCollectionOffsets(
-            ParquetField field,
-            int[] definitionLevels,
-            int[] repetitionLevels,
-            boolean readRowField) {
+            ParquetField field, int[] definitionLevels, int[] 
repetitionLevels) {
         int collectionDefinitionLevel = field.getDefinitionLevel();
         int collectionRepetitionLevel = field.getRepetitionLevel() + 1;
         int offset = 0;
@@ -63,42 +108,36 @@ public class NestedPositionUtil {
         for (int i = 0;
                 i < definitionLevels.length;
                 i = getNextCollectionStartIndex(repetitionLevels, 
collectionRepetitionLevel, i)) {
+            valueCount++;
             if (definitionLevels[i] >= collectionDefinitionLevel - 1) {
+                boolean isNull =
+                        isOptionalFieldValueNull(definitionLevels[i], 
collectionDefinitionLevel);
+                nullCollectionFlags.add(isNull);
+                nullValuesCount += isNull ? 1 : 0;
                 // definitionLevels[i] > collectionDefinitionLevel  => 
Collection is defined and not
                 // empty
                 // definitionLevels[i] == collectionDefinitionLevel => 
Collection is defined but
                 // empty
-                // definitionLevels[i] == collectionDefinitionLevel - 1 => 
Collection is defined but
-                // null
                 if (definitionLevels[i] > collectionDefinitionLevel) {
-                    nullCollectionFlags.add(false);
                     emptyCollectionFlags.add(false);
                     offset += getCollectionSize(repetitionLevels, 
collectionRepetitionLevel, i + 1);
                 } else if (definitionLevels[i] == collectionDefinitionLevel) {
-                    nullCollectionFlags.add(false);
+                    offset++;
                     emptyCollectionFlags.add(true);
-                    // don't increase offset for empty values
                 } else {
-                    nullCollectionFlags.add(true);
-                    nullValuesCount++;
-                    // 1. don't increase offset for null values
-                    // 2. offsets and emptyCollectionFlags are meaningless for 
null values, but they
-                    // must be set at each index for calculating lengths later
+                    offset++;
                     emptyCollectionFlags.add(false);
                 }
                 offsets.add(offset);
-                valueCount++;
-            } else if (definitionLevels[i] == collectionDefinitionLevel - 2 && 
readRowField) {
-                // row field should store null value
+            } else {
+                // when definitionLevels[i] < collectionDefinitionLevel - 1, 
it means the collection
+                // is
+                // not defined, but we need to regard it as null to avoid 
getting value wrong.
                 nullCollectionFlags.add(true);
                 nullValuesCount++;
+                offsets.add(++offset);
                 emptyCollectionFlags.add(false);
-
-                offsets.add(offset);
-                valueCount++;
             }
-            // else when definitionLevels[i] < collectionDefinitionLevel - 1, 
it means the
-            // collection is not defined, just ignore it
         }
         long[] offsetsArray = offsets.toArray();
         long[] length = 
calculateLengthByOffsets(emptyCollectionFlags.toArray(), offsetsArray);
@@ -109,6 +148,10 @@ public class NestedPositionUtil {
                 nullCollectionFlags.toArray(), offsetsArray, length, 
valueCount);
     }
 
+    public static boolean isOptionalFieldValueNull(int definitionLevel, int 
maxDefinitionLevel) {
+        return definitionLevel == maxDefinitionLevel - 1;
+    }
+
     public static long[] calculateLengthByOffsets(
             boolean[] collectionIsEmpty, long[] arrayOffsets) {
         LongArrayList lengthList = new LongArrayList(arrayOffsets.length);
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java
index f0a82a6d71..69b0fa5744 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/NestedPrimitiveColumnReader.java
@@ -75,14 +75,12 @@ public class NestedPrimitiveColumnReader implements 
ColumnReader<WritableColumnV
     private final ColumnDescriptor descriptor;
     private final Type type;
     private final DataType dataType;
-    private final boolean readRowField;
-    private final boolean readMapKey;
     /** The dictionary, if this column has dictionary encoding. */
     private final ParquetDataColumnReader dictionary;
     /** Maximum definition level for this column. */
     private final int maxDefLevel;
 
-    private final boolean isUtcTimestamp;
+    private boolean isUtcTimestamp;
 
     /** If true, the current page is dictionary encoded. */
     private boolean isCurrentPageDictionaryEncoded;
@@ -106,16 +104,14 @@ public class NestedPrimitiveColumnReader implements 
ColumnReader<WritableColumnV
 
     private boolean isFirstRow = true;
 
-    private final LastValueContainer lastValue = new LastValueContainer();
+    private Object lastValue;
 
     public NestedPrimitiveColumnReader(
             ColumnDescriptor descriptor,
             PageReadStore pageReadStore,
             boolean isUtcTimestamp,
             Type parquetType,
-            DataType dataType,
-            boolean readRowField,
-            boolean readMapKey)
+            DataType dataType)
             throws IOException {
         this.descriptor = descriptor;
         this.type = parquetType;
@@ -123,8 +119,6 @@ public class NestedPrimitiveColumnReader implements 
ColumnReader<WritableColumnV
         this.maxDefLevel = descriptor.getMaxDefinitionLevel();
         this.isUtcTimestamp = isUtcTimestamp;
         this.dataType = dataType;
-        this.readRowField = readRowField;
-        this.readMapKey = readMapKey;
 
         DictionaryPage dictionaryPage = pageReader.readDictionaryPage();
 
@@ -196,9 +190,8 @@ public class NestedPrimitiveColumnReader implements 
ColumnReader<WritableColumnV
             boolean needFilterSkip = pageRowId < rangeStart;
 
             do {
-
-                if (!lastValue.shouldSkip && !needFilterSkip) {
-                    valueList.add(lastValue.value);
+                if (!needFilterSkip) {
+                    valueList.add(lastValue);
                     valueIndex++;
                 }
             } while (readValue() && (repetitionLevel != 0));
@@ -225,27 +218,6 @@ public class NestedPrimitiveColumnReader implements 
ColumnReader<WritableColumnV
         return new LevelDelegation(repetition, definition);
     }
 
-    /**
-     * An ARRAY[ARRAY[INT]] Example: {[[0, null], [1], [], null], [], null} => 
[5, 4, 5, 3, 2, 1, 0]
-     *
-     * <ul>
-     *   <li>definitionLevel == maxDefLevel => not null value
-     *   <li>definitionLevel == maxDefLevel - 1 => null value
-     *   <li>definitionLevel == maxDefLevel - 2 => empty set, skip
-     *   <li>definitionLevel == maxDefLevel - 3 => null set, skip
-     *   <li>definitionLevel == maxDefLevel - 4 => empty outer set, skip
-     *   <li>definitionLevel == maxDefLevel - 5 => null outer set, skip
-     *   <li>... skip
-     * </ul>
-     *
-     * <p>When (definitionLevel <= maxDefLevel - 2) we skip the value because 
children ColumnVector
-     * for OrcArrayColumnVector don't contain empty and null set value. Stay 
consistent here.
-     *
-     * <p>For MAP, the value vector is the same as ARRAY. But the key vector 
isn't nullable, so just
-     * read value when definitionLevel == maxDefLevel.
-     *
-     * <p>For ROW, RowColumnVector still get null value when definitionLevel 
== maxDefLevel - 2.
-     */
     private boolean readValue() throws IOException {
         int left = readPageIfNeed();
         if (left > 0) {
@@ -255,24 +227,12 @@ public class NestedPrimitiveColumnReader implements 
ColumnReader<WritableColumnV
             if (definitionLevel == maxDefLevel) {
                 if (isCurrentPageDictionaryEncoded) {
                     int dictionaryId = dataColumn.readValueDictionaryId();
-                    lastValue.setValue(dictionaryDecodeValue(dataType, 
dictionaryId));
+                    lastValue = dictionaryDecodeValue(dataType, dictionaryId);
                 } else {
-                    lastValue.setValue(readPrimitiveTypedRow(dataType));
+                    lastValue = readPrimitiveTypedRow(dataType);
                 }
             } else {
-                if (readMapKey) {
-                    lastValue.skip();
-                } else {
-                    if (definitionLevel == maxDefLevel - 1) {
-                        // null value inner set
-                        lastValue.setValue(null);
-                    } else if (definitionLevel == maxDefLevel - 2 && 
readRowField) {
-                        lastValue.setValue(null);
-                    } else {
-                        // current set is empty or null
-                        lastValue.skip();
-                    }
-                }
+                lastValue = null;
             }
             return true;
         } else {
@@ -721,18 +681,4 @@ public class NestedPrimitiveColumnReader implements 
ColumnReader<WritableColumnV
             return 0;
         }
     }
-
-    private static class LastValueContainer {
-        protected boolean shouldSkip;
-        protected Object value;
-
-        protected void setValue(Object value) {
-            this.value = value;
-            this.shouldSkip = false;
-        }
-
-        protected void skip() {
-            this.shouldSkip = true;
-        }
-    }
 }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/RowColumnReader.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/RowColumnReader.java
new file mode 100644
index 0000000000..fa2da03ef3
--- /dev/null
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/reader/RowColumnReader.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.parquet.reader;
+
+import org.apache.paimon.data.columnar.heap.HeapRowVector;
+import org.apache.paimon.data.columnar.writable.WritableColumnVector;
+
+import java.io.IOException;
+import java.util.List;
+
+/** Row {@link ColumnReader}. */
+public class RowColumnReader implements ColumnReader<WritableColumnVector> {
+
+    private final List<ColumnReader> fieldReaders;
+
+    public RowColumnReader(List<ColumnReader> fieldReaders) {
+        this.fieldReaders = fieldReaders;
+    }
+
+    @Override
+    public void readToVector(int readNumber, WritableColumnVector vector) 
throws IOException {
+        HeapRowVector rowVector = (HeapRowVector) vector;
+        WritableColumnVector[] vectors = rowVector.getFields();
+        // row vector null array
+        boolean[] isNulls = new boolean[readNumber];
+        for (int i = 0; i < vectors.length; i++) {
+            fieldReaders.get(i).readToVector(readNumber, vectors[i]);
+
+            for (int j = 0; j < readNumber; j++) {
+                if (i == 0) {
+                    isNulls[j] = vectors[i].isNullAt(j);
+                } else {
+                    isNulls[j] = isNulls[j] && vectors[i].isNullAt(j);
+                }
+                if (i == vectors.length - 1 && isNulls[j]) {
+                    // rowColumnVector[j] is null only when all fields[j] of 
rowColumnVector[j] is
+                    // null
+                    rowVector.setNullAt(j);
+                }
+            }
+        }
+    }
+}
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetColumnVectorTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetColumnVectorTest.java
index 453e9f2752..0d862c3963 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetColumnVectorTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetColumnVectorTest.java
@@ -30,8 +30,6 @@ import org.apache.paimon.data.columnar.ArrayColumnVector;
 import org.apache.paimon.data.columnar.BytesColumnVector;
 import org.apache.paimon.data.columnar.ColumnVector;
 import org.apache.paimon.data.columnar.IntColumnVector;
-import org.apache.paimon.data.columnar.MapColumnVector;
-import org.apache.paimon.data.columnar.RowColumnVector;
 import org.apache.paimon.data.columnar.VectorizedColumnBatch;
 import org.apache.paimon.format.FormatReaderContext;
 import org.apache.paimon.format.FormatWriter;
@@ -53,6 +51,7 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -120,12 +119,12 @@ public class ParquetColumnVectorTest {
         assertThat(iterator.next()).isNull();
 
         // validate ColumnVector
-        ArrayColumnVector arrayColumnVector = (ArrayColumnVector) 
batch.columns[0];
-        expectedData.validateColumnVector(arrayColumnVector, getter);
-
-        expectedData.validateInnerChild(
-                arrayColumnVector.getColumnVector(), 
BYTES_COLUMN_VECTOR_STRING_FUNC);
-
+        //        ArrayColumnVector arrayColumnVector = (ArrayColumnVector) 
batch.columns[0];
+        //        expectedData.validateColumnVector(arrayColumnVector, getter);
+        //
+        //        expectedData.validateInnerChild(
+        //                arrayColumnVector.getColumnVector(), 
BYTES_COLUMN_VECTOR_STRING_FUNC);
+        //
         iterator.releaseBatch();
     }
 
@@ -188,16 +187,16 @@ public class ParquetColumnVectorTest {
         assertThat(iterator.next()).isNull();
 
         // validate column vector
-        ArrayColumnVector arrayColumnVector = (ArrayColumnVector) 
batch.columns[0];
-
-        expectedData.validateOuterArray(arrayColumnVector, getter);
-
-        ArrayColumnVector innerArrayColumnVector =
-                (ArrayColumnVector) arrayColumnVector.getColumnVector();
-        expectedData.validateInnerArray(innerArrayColumnVector, getter);
-
-        ColumnVector columnVector = innerArrayColumnVector.getColumnVector();
-        expectedData.validateInnerChild(columnVector, 
BYTES_COLUMN_VECTOR_STRING_FUNC);
+        //        ArrayColumnVector arrayColumnVector = (ArrayColumnVector) 
batch.columns[0];
+        //
+        //        expectedData.validateOuterArray(arrayColumnVector, getter);
+        //
+        //        ArrayColumnVector innerArrayColumnVector =
+        //                (ArrayColumnVector) 
arrayColumnVector.getColumnVector();
+        //        expectedData.validateInnerArray(innerArrayColumnVector, 
getter);
+        //
+        //        ColumnVector columnVector = 
innerArrayColumnVector.getColumnVector();
+        //        expectedData.validateInnerChild(columnVector, 
BYTES_COLUMN_VECTOR_STRING_FUNC);
     }
 
     @Test
@@ -251,11 +250,13 @@ public class ParquetColumnVectorTest {
         assertThat(iterator.next()).isNull();
 
         // validate ColumnVector
-        MapColumnVector mapColumnVector = (MapColumnVector) batch.columns[0];
-        IntColumnVector keyColumnVector = (IntColumnVector) 
mapColumnVector.getKeyColumnVector();
-        validateMapKeyColumnVector(keyColumnVector, expectedData);
-        ColumnVector valueColumnVector = 
mapColumnVector.getValueColumnVector();
-        expectedData.validateInnerChild(valueColumnVector, 
BYTES_COLUMN_VECTOR_STRING_FUNC);
+        //        MapColumnVector mapColumnVector = (MapColumnVector) 
batch.columns[0];
+        //        IntColumnVector keyColumnVector = (IntColumnVector)
+        // mapColumnVector.getKeyColumnVector();
+        //        validateMapKeyColumnVector(keyColumnVector, expectedData);
+        //        ColumnVector valueColumnVector = 
mapColumnVector.getValueColumnVector();
+        //        expectedData.validateInnerChild(valueColumnVector,
+        // BYTES_COLUMN_VECTOR_STRING_FUNC);
 
         iterator.releaseBatch();
     }
@@ -330,16 +331,17 @@ public class ParquetColumnVectorTest {
         assertThat(iterator.next()).isNull();
 
         // validate column vector
-        MapColumnVector mapColumnVector = (MapColumnVector) batch.columns[0];
-        IntColumnVector keyColumnVector = (IntColumnVector) 
mapColumnVector.getKeyColumnVector();
-        validateMapKeyColumnVector(keyColumnVector, expectedData);
-
-        ArrayColumnVector valueColumnVector =
-                (ArrayColumnVector) mapColumnVector.getValueColumnVector();
-        expectedData.validateInnerArray(valueColumnVector, getter);
-        expectedData.validateInnerChild(
-                valueColumnVector.getColumnVector(), 
BYTES_COLUMN_VECTOR_STRING_FUNC);
-
+        //        MapColumnVector mapColumnVector = (MapColumnVector) 
batch.columns[0];
+        //        IntColumnVector keyColumnVector = (IntColumnVector)
+        // mapColumnVector.getKeyColumnVector();
+        //        validateMapKeyColumnVector(keyColumnVector, expectedData);
+        //
+        //        ArrayColumnVector valueColumnVector =
+        //                (ArrayColumnVector) 
mapColumnVector.getValueColumnVector();
+        //        expectedData.validateInnerArray(valueColumnVector, getter);
+        //        expectedData.validateInnerChild(
+        //                valueColumnVector.getColumnVector(), 
BYTES_COLUMN_VECTOR_STRING_FUNC);
+        //
         iterator.releaseBatch();
     }
 
@@ -448,23 +450,23 @@ public class ParquetColumnVectorTest {
         assertThat(iterator.next()).isNull();
 
         // validate ColumnVector
-        RowColumnVector rowColumnVector = (RowColumnVector) batch.columns[0];
-        VectorizedColumnBatch innerBatch = rowColumnVector.getBatch();
-
-        IntColumnVector intColumnVector = (IntColumnVector) 
innerBatch.columns[0];
-        for (int i = 0; i < numRows; i++) {
-            Integer f0Value = f0.get(i);
-            if (f0Value == null) {
-                assertThat(intColumnVector.isNullAt(i)).isTrue();
-            } else {
-                assertThat(intColumnVector.getInt(i)).isEqualTo(f0Value);
-            }
-        }
-
-        ArrayColumnVector arrayColumnVector = (ArrayColumnVector) 
innerBatch.columns[1];
-        expectedData.validateColumnVector(arrayColumnVector, getter);
-        expectedData.validateInnerChild(
-                arrayColumnVector.getColumnVector(), 
BYTES_COLUMN_VECTOR_STRING_FUNC);
+        //        RowColumnVector rowColumnVector = (RowColumnVector) 
batch.columns[0];
+        //        VectorizedColumnBatch innerBatch = 
rowColumnVector.getBatch();
+        //
+        //        IntColumnVector intColumnVector = (IntColumnVector) 
innerBatch.columns[0];
+        //        for (int i = 0; i < numRows; i++) {
+        //            Integer f0Value = f0.get(i);
+        //            if (f0Value == null) {
+        //                assertThat(intColumnVector.isNullAt(i)).isTrue();
+        //            } else {
+        //                
assertThat(intColumnVector.getInt(i)).isEqualTo(f0Value);
+        //            }
+        //        }
+        //
+        //        ArrayColumnVector arrayColumnVector = (ArrayColumnVector) 
innerBatch.columns[1];
+        //        expectedData.validateColumnVector(arrayColumnVector, getter);
+        //        expectedData.validateInnerChild(
+        //                arrayColumnVector.getColumnVector(), 
BYTES_COLUMN_VECTOR_STRING_FUNC);
 
         iterator.releaseBatch();
     }
@@ -557,40 +559,148 @@ public class ParquetColumnVectorTest {
         assertThat(iterator.next()).isNull();
 
         // validate ColumnVector
-        ArrayColumnVector arrayColumnVector = (ArrayColumnVector) 
batch.columns[0];
-        assertThat(arrayColumnVector.isNullAt(0)).isFalse();
-        assertThat(arrayColumnVector.isNullAt(1)).isTrue();
-        assertThat(arrayColumnVector.isNullAt(2)).isFalse();
-        assertThat(arrayColumnVector.isNullAt(3)).isFalse();
-
-        RowColumnVector rowColumnVector = (RowColumnVector) 
arrayColumnVector.getColumnVector();
-        BytesColumnVector f0Vector = (BytesColumnVector) 
rowColumnVector.getBatch().columns[0];
-        for (int i = 0; i < 3; i++) {
-            BinaryString s = f0.get(i);
-            if (s == null) {
-                assertThat(f0Vector.isNullAt(i)).isTrue();
-            } else {
-                assertThat(new 
String(f0Vector.getBytes(i).getBytes())).isEqualTo(s.toString());
-            }
-        }
-        ArrayColumnVector f1Vector = (ArrayColumnVector) 
rowColumnVector.getBatch().columns[1];
-        InternalArray internalArray0 = f1Vector.getArray(0);
-        assertThat(internalArray0.size()).isEqualTo(2);
-        assertThat(internalArray0.isNullAt(0)).isFalse();
-        assertThat(internalArray0.isNullAt(1)).isTrue();
+        //        ArrayColumnVector arrayColumnVector = (ArrayColumnVector) 
batch.columns[0];
+        //        assertThat(arrayColumnVector.isNullAt(0)).isFalse();
+        //        assertThat(arrayColumnVector.isNullAt(1)).isTrue();
+        //        assertThat(arrayColumnVector.isNullAt(2)).isFalse();
+        //        assertThat(arrayColumnVector.isNullAt(3)).isFalse();
+        //
+        //        RowColumnVector rowColumnVector = (RowColumnVector)
+        // arrayColumnVector.getColumnVector();
+        //        BytesColumnVector f0Vector = (BytesColumnVector)
+        // rowColumnVector.getBatch().columns[0];
+        //        for (int i = 0; i < 3; i++) {
+        //            BinaryString s = f0.get(i);
+        //            if (s == null) {
+        //                assertThat(f0Vector.isNullAt(i)).isTrue();
+        //            } else {
+        //                assertThat(new
+        // String(f0Vector.getBytes(i).getBytes())).isEqualTo(s.toString());
+        //            }
+        //        }
+        //        ArrayColumnVector f1Vector = (ArrayColumnVector)
+        // rowColumnVector.getBatch().columns[1];
+        //        InternalArray internalArray0 = f1Vector.getArray(0);
+        //        assertThat(internalArray0.size()).isEqualTo(2);
+        //        assertThat(internalArray0.isNullAt(0)).isFalse();
+        //        assertThat(internalArray0.isNullAt(1)).isTrue();
+        //
+        //        InternalArray internalArray1 = f1Vector.getArray(1);
+        //        assertThat(internalArray1.size()).isEqualTo(0);
+        //
+        //        InternalArray internalArray2 = f1Vector.getArray(2);
+        //        assertThat(internalArray2.size()).isEqualTo(1);
+        //        assertThat(internalArray2.isNullAt(0)).isFalse();
+        //
+        //        IntColumnVector intColumnVector = (IntColumnVector) 
f1Vector.getColumnVector();
+        //        assertThat(intColumnVector.getInt(0)).isEqualTo(0);
+        //        assertThat(intColumnVector.isNullAt(1)).isTrue();
+        //        assertThat(intColumnVector.getInt(2)).isEqualTo(1);
 
-        InternalArray internalArray1 = f1Vector.getArray(1);
-        assertThat(internalArray1.size()).isEqualTo(0);
+        iterator.releaseBatch();
+    }
 
-        InternalArray internalArray2 = f1Vector.getArray(2);
-        assertThat(internalArray2.size()).isEqualTo(1);
-        assertThat(internalArray2.isNullAt(0)).isFalse();
+    @Test
+    public void testHighlyNestedSchema() throws IOException {
+        RowType rowType =
+                RowType.builder()
+                        .field(
+                                "row",
+                                RowType.builder()
+                                        .field("f0", 
DataTypes.ARRAY(RowType.of(DataTypes.INT())))
+                                        .field("f1", 
RowType.of(DataTypes.INT()))
+                                        .build())
+                        .build();
 
-        IntColumnVector intColumnVector = (IntColumnVector) 
f1Vector.getColumnVector();
-        assertThat(intColumnVector.getInt(0)).isEqualTo(0);
-        assertThat(intColumnVector.isNullAt(1)).isTrue();
-        assertThat(intColumnVector.getInt(2)).isEqualTo(1);
+        InternalRow row0 = GenericRow.of((Object) null);
+        InternalRow row1 = GenericRow.of(GenericRow.of(null, 
GenericRow.of(1)));
+        InternalRow row2 =
+                GenericRow.of(
+                        GenericRow.of(
+                                new GenericArray(
+                                        new GenericRow[] {
+                                            GenericRow.of((Object) null), 
GenericRow.of(22)
+                                        }),
+                                GenericRow.of((Object) null)));
+        InternalRow row3 =
+                GenericRow.of(GenericRow.of(new GenericArray(new GenericRow[] 
{null}), null));
+
+        VectorizedRecordIterator iterator =
+                createVectorizedRecordIterator(rowType, Arrays.asList(row0, 
row1, row2, row3));
 
+        // validate column vector
+        //        VectorizedColumnBatch batch = iterator.batch();
+        //        RowColumnVector row = (RowColumnVector) batch.columns[0];
+        //
+        //        assertThat(row.isNullAt(0)).isTrue();
+        //        assertThat(row.isNullAt(1)).isFalse();
+        //        assertThat(row.isNullAt(2)).isFalse();
+        //        assertThat(row.isNullAt(3)).isFalse();
+        //
+        //        ArrayColumnVector f0 = (ArrayColumnVector) 
row.getBatch().columns[0];
+        //        assertThat(f0.isNullAt(0)).isTrue();
+        //        assertThat(f0.isNullAt(1)).isTrue();
+        //        assertThat(f0.isNullAt(2)).isFalse();
+        //        assertThat(f0.isNullAt(3)).isFalse();
+        //
+        //        RowColumnVector arrayRow = (RowColumnVector) 
f0.getColumnVector();
+        //        assertThat(arrayRow.isNullAt(0)).isFalse();
+        //        assertThat(arrayRow.isNullAt(1)).isFalse();
+        //        assertThat(arrayRow.isNullAt(2)).isTrue();
+        //
+        //        IntColumnVector arrayRowInt = (IntColumnVector) 
arrayRow.getBatch().columns[0];
+        //        assertThat(arrayRowInt.isNullAt(0)).isTrue();
+        //        assertThat(arrayRowInt.isNullAt(1)).isFalse();
+        //        assertThat(arrayRowInt.isNullAt(2)).isTrue();
+        //
+        //        assertThat(arrayRowInt.getInt(1)).isEqualTo(22);
+        //
+        //        RowColumnVector f1 = (RowColumnVector) 
row.getBatch().columns[1];
+        //        assertThat(f1.isNullAt(0)).isTrue();
+        //        assertThat(f1.isNullAt(1)).isFalse();
+        //        assertThat(f1.isNullAt(2)).isFalse();
+        //        assertThat(f1.isNullAt(3)).isTrue();
+        //
+        //        IntColumnVector rowInt = (IntColumnVector) 
f1.getBatch().columns[0];
+        //        assertThat(rowInt.isNullAt(0)).isTrue();
+        //        assertThat(rowInt.isNullAt(1)).isFalse();
+        //        assertThat(rowInt.isNullAt(2)).isTrue();
+        //        assertThat(rowInt.isNullAt(3)).isTrue();
+        //
+        //        assertThat(rowInt.getInt(1)).isEqualTo(1);
+
+        // validate per row
+        InternalRow internalRow0 = iterator.next();
+        assertThat(internalRow0.isNullAt(0)).isTrue();
+
+        InternalRow internalRow1 = iterator.next();
+        assertThat(internalRow1.isNullAt(0)).isFalse();
+        InternalRow internalRow1InternalRow = internalRow1.getRow(0, 2);
+        assertThat(internalRow1InternalRow.isNullAt(0)).isTrue();
+        InternalRow internalRow1InternalRowF1 = 
internalRow1InternalRow.getRow(1, 1);
+        assertThat(internalRow1InternalRowF1.getInt(0)).isEqualTo(1);
+
+        InternalRow internalRow2 = iterator.next();
+        assertThat(internalRow2.isNullAt(0)).isFalse();
+        InternalRow internalRow2InternalRow = internalRow2.getRow(0, 2);
+        InternalArray internalRow2InternalRowF0 = 
internalRow2InternalRow.getArray(0);
+        assertThat(internalRow2InternalRowF0.size()).isEqualTo(2);
+        InternalRow i0 = internalRow2InternalRowF0.getRow(0, 1);
+        assertThat(i0.isNullAt(0)).isTrue();
+        InternalRow i1 = internalRow2InternalRowF0.getRow(1, 1);
+        assertThat(i1.getInt(0)).isEqualTo(22);
+        InternalRow internalRow2InternalRowF1 = 
internalRow2InternalRow.getRow(1, 1);
+        assertThat(internalRow2InternalRowF1.isNullAt(0)).isTrue();
+
+        InternalRow internalRow3 = iterator.next();
+        assertThat(internalRow3.isNullAt(0)).isFalse();
+        InternalRow internalRow3InternalRow = internalRow3.getRow(0, 2);
+        InternalArray internalRow3InternalRowF0 = 
internalRow3InternalRow.getArray(0);
+        assertThat(internalRow3InternalRowF0.size()).isEqualTo(1);
+        assertThat(internalRow3InternalRowF0.isNullAt(0)).isTrue();
+        assertThat(internalRow3InternalRow.isNullAt(1)).isTrue();
+
+        assertThat(iterator.next()).isNull();
         iterator.releaseBatch();
     }
 
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
index ffe4d60082..7db10bab96 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetReadWriteTest.java
@@ -34,7 +34,6 @@ import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.reader.RecordReaderIterator;
 import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.BooleanType;
@@ -176,7 +175,11 @@ public class ParquetReadWriteTest {
                                                             new 
ArrayType(true, new IntType())))
                                             .field("c", new IntType())
                                             .build()),
-                            new IntType()));
+                            new IntType()),
+                    RowType.of(
+                            new ArrayType(RowType.of(new VarCharType(255))),
+                            RowType.of(new IntType()),
+                            new VarCharType(255)));
 
     @TempDir public File folder;
 
@@ -464,7 +467,9 @@ public class ParquetReadWriteTest {
                 format.createReader(
                         new FormatReaderContext(
                                 new LocalFileIO(), path, new 
LocalFileIO().getFileSize(path)));
-        compareNestedRow(rows, new RecordReaderIterator<>(reader));
+        List<InternalRow> results = new ArrayList<>(1283);
+        reader.forEachRemaining(results::add);
+        compareNestedRow(rows, results);
     }
 
     @Test
@@ -822,7 +827,8 @@ public class ParquetReadWriteTest {
                                                                 }),
                                                         i)
                                             }),
-                                    i)));
+                                    i),
+                            null));
         }
         return rows;
     }
@@ -873,6 +879,7 @@ public class ParquetReadWriteTest {
                 row1.add(0, i);
                 Group row2 = rowList.addGroup(0);
                 row2.add(0, i + 1);
+                f4.addGroup(0);
 
                 // add ROW<`f0` ARRAY<ROW<`b` ARRAY<ARRAY<INT>>, `c` INT>>, 
`f1` INT>>
                 Group f5 = row.addGroup("f5");
@@ -881,6 +888,7 @@ public class ParquetReadWriteTest {
                 Group insideArray = insideRow.addGroup(0);
                 createParquetDoubleNestedArray(insideArray, i);
                 insideRow.add(1, i);
+                arrayRow.addGroup(0);
                 f5.add(1, i);
                 writer.write(row);
             }
@@ -918,12 +926,12 @@ public class ParquetReadWriteTest {
         }
     }
 
-    private void compareNestedRow(
-            List<InternalRow> rows, RecordReaderIterator<InternalRow> 
iterator) throws Exception {
-        for (InternalRow origin : rows) {
-            assertThat(iterator.hasNext()).isTrue();
-            InternalRow result = iterator.next();
+    private void compareNestedRow(List<InternalRow> rows, List<InternalRow> 
results) {
+        Assertions.assertEquals(rows.size(), results.size());
 
+        for (InternalRow result : results) {
+            int index = result.getInt(0);
+            InternalRow origin = rows.get(index);
             Assertions.assertEquals(origin.getInt(0), result.getInt(0));
 
             // int[]
@@ -1011,9 +1019,11 @@ public class ParquetReadWriteTest {
                     origin.getRow(5, 2).getArray(0).getRow(0, 2).getInt(1),
                     result.getRow(5, 2).getArray(0).getRow(0, 2).getInt(1));
             Assertions.assertEquals(origin.getRow(5, 2).getInt(1), 
result.getRow(5, 2).getInt(1));
+            Assertions.assertTrue(result.isNullAt(6));
+            Assertions.assertTrue(result.getRow(6, 2).isNullAt(0));
+            Assertions.assertTrue(result.getRow(6, 2).isNullAt(1));
+            Assertions.assertTrue(result.getRow(6, 2).isNullAt(2));
         }
-        assertThat(iterator.hasNext()).isFalse();
-        iterator.close();
     }
 
     private void fillWithMap(Map<String, String> map, InternalMap internalMap, 
int index) {

Reply via email to