This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 1fd81f2ae3 [parquet] parquet reader should not retrun 
VectorizedRowIterator for nested schema (#4749)
1fd81f2ae3 is described below

commit 1fd81f2ae37b6ccdc4392412b0590ff8488d4498
Author: Jingsong Lee <[email protected]>
AuthorDate: Sat Dec 21 21:49:22 2024 +0800

    [parquet] parquet reader should not retrun VectorizedRowIterator for nested 
schema (#4749)
---
 .../paimon/data/columnar/ColumnarRowIterator.java  | 22 +++-----
 .../data/columnar/VectorizedColumnBatch.java       |  6 ---
 .../data/columnar/VectorizedRowIterator.java       | 45 ++++++++++++++++
 .../apache/paimon/format/orc/OrcReaderFactory.java |  5 +-
 .../format/parquet/ParquetReaderFactory.java       | 30 +++++++++--
 .../format/parquet/ParquetColumnVectorTest.java    | 62 +++++++++++++++-------
 6 files changed, 125 insertions(+), 45 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
index 874c221348..faecd1ccd8 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
@@ -23,7 +23,6 @@ import org.apache.paimon.data.PartitionInfo;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.reader.FileRecordIterator;
 import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.reader.VectorizedRecordIterator;
 import org.apache.paimon.utils.RecyclableIterator;
 import org.apache.paimon.utils.VectorMappingUtils;
 
@@ -34,15 +33,15 @@ import javax.annotation.Nullable;
  * {@link ColumnarRow#setRowId}.
  */
 public class ColumnarRowIterator extends RecyclableIterator<InternalRow>
-        implements FileRecordIterator<InternalRow>, VectorizedRecordIterator {
+        implements FileRecordIterator<InternalRow> {
 
-    private final Path filePath;
-    private final ColumnarRow row;
-    private final Runnable recycler;
+    protected final Path filePath;
+    protected final ColumnarRow row;
+    protected final Runnable recycler;
 
-    private int num;
-    private int nextPos;
-    private long nextFilePos;
+    protected int num;
+    protected int nextPos;
+    protected long nextFilePos;
 
     public ColumnarRowIterator(Path filePath, ColumnarRow row, @Nullable 
Runnable recycler) {
         super(recycler);
@@ -79,7 +78,7 @@ public class ColumnarRowIterator extends 
RecyclableIterator<InternalRow>
         return this.filePath;
     }
 
-    public ColumnarRowIterator copy(ColumnVector[] vectors) {
+    protected ColumnarRowIterator copy(ColumnVector[] vectors) {
         ColumnarRowIterator newIterator =
                 new ColumnarRowIterator(filePath, row.copy(vectors), recycler);
         newIterator.reset(nextFilePos);
@@ -101,9 +100,4 @@ public class ColumnarRowIterator extends 
RecyclableIterator<InternalRow>
         }
         return this;
     }
-
-    @Override
-    public VectorizedColumnBatch batch() {
-        return row.batch();
-    }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedColumnBatch.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedColumnBatch.java
index dab5356435..4cf5f4c7c2 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedColumnBatch.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedColumnBatch.java
@@ -38,12 +38,6 @@ public class VectorizedColumnBatch implements Serializable {
 
     private static final long serialVersionUID = 8180323238728166155L;
 
-    /**
-     * This number is carefully chosen to minimize overhead and typically 
allows one
-     * VectorizedColumnBatch to fit in cache.
-     */
-    public static final int DEFAULT_SIZE = 2048;
-
     private int numRows;
 
     public final ColumnVector[] columns;
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedRowIterator.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedRowIterator.java
new file mode 100644
index 0000000000..889da334c5
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedRowIterator.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.columnar;
+
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.reader.VectorizedRecordIterator;
+
+import javax.annotation.Nullable;
+
+/** A {@link ColumnarRowIterator} with {@link VectorizedRecordIterator}. */
+public class VectorizedRowIterator extends ColumnarRowIterator implements 
VectorizedRecordIterator {
+
+    public VectorizedRowIterator(Path filePath, ColumnarRow row, @Nullable 
Runnable recycler) {
+        super(filePath, row, recycler);
+    }
+
+    @Override
+    public VectorizedColumnBatch batch() {
+        return row.batch();
+    }
+
+    @Override
+    protected VectorizedRowIterator copy(ColumnVector[] vectors) {
+        VectorizedRowIterator newIterator =
+                new VectorizedRowIterator(filePath, row.copy(vectors), 
recycler);
+        newIterator.reset(nextFilePos);
+        return newIterator;
+    }
+}
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
index db17357bfd..6683b357fd 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.columnar.ColumnVector;
 import org.apache.paimon.data.columnar.ColumnarRow;
 import org.apache.paimon.data.columnar.ColumnarRowIterator;
 import org.apache.paimon.data.columnar.VectorizedColumnBatch;
+import org.apache.paimon.data.columnar.VectorizedRowIterator;
 import org.apache.paimon.fileindex.FileIndexResult;
 import org.apache.paimon.fileindex.bitmap.BitmapIndexResult;
 import org.apache.paimon.format.FormatReaderFactory;
@@ -158,7 +159,7 @@ public class OrcReaderFactory implements 
FormatReaderFactory {
         private final Pool.Recycler<OrcReaderBatch> recycler;
 
         private final VectorizedColumnBatch paimonColumnBatch;
-        private final ColumnarRowIterator result;
+        private final VectorizedRowIterator result;
 
         protected OrcReaderBatch(
                 final Path filePath,
@@ -169,7 +170,7 @@ public class OrcReaderFactory implements 
FormatReaderFactory {
             this.recycler = checkNotNull(recycler);
             this.paimonColumnBatch = paimonColumnBatch;
             this.result =
-                    new ColumnarRowIterator(
+                    new VectorizedRowIterator(
                             filePath, new ColumnarRow(paimonColumnBatch), 
this::recycle);
         }
 
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
index 910f3031e0..9fef756371 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
@@ -19,10 +19,14 @@
 package org.apache.paimon.format.parquet;
 
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.columnar.ArrayColumnVector;
 import org.apache.paimon.data.columnar.ColumnVector;
 import org.apache.paimon.data.columnar.ColumnarRow;
 import org.apache.paimon.data.columnar.ColumnarRowIterator;
+import org.apache.paimon.data.columnar.MapColumnVector;
+import org.apache.paimon.data.columnar.RowColumnVector;
 import org.apache.paimon.data.columnar.VectorizedColumnBatch;
+import org.apache.paimon.data.columnar.VectorizedRowIterator;
 import org.apache.paimon.data.columnar.heap.ElementCountable;
 import org.apache.paimon.data.columnar.writable.WritableColumnVector;
 import org.apache.paimon.format.FormatReaderFactory;
@@ -525,7 +529,8 @@ public class ParquetReaderFactory implements 
FormatReaderFactory {
     private static class ParquetReaderBatch {
 
         private final WritableColumnVector[] writableVectors;
-        protected final VectorizedColumnBatch columnarBatch;
+        private final boolean containsNestedColumn;
+        private final VectorizedColumnBatch columnarBatch;
         private final Pool.Recycler<ParquetReaderBatch> recycler;
 
         private final ColumnarRowIterator result;
@@ -536,11 +541,30 @@ public class ParquetReaderFactory implements 
FormatReaderFactory {
                 VectorizedColumnBatch columnarBatch,
                 Pool.Recycler<ParquetReaderBatch> recycler) {
             this.writableVectors = writableVectors;
+            this.containsNestedColumn =
+                    Arrays.stream(writableVectors)
+                            .anyMatch(
+                                    vector ->
+                                            vector instanceof MapColumnVector
+                                                    || vector instanceof 
RowColumnVector
+                                                    || vector instanceof 
ArrayColumnVector);
             this.columnarBatch = columnarBatch;
             this.recycler = recycler;
+
+            /*
+             * See <a 
href="https://github.com/apache/paimon/pull/3883";>Reverted #3883</a>.
+             * If a FileRecordIterator contains a batch and the batch's nested 
vectors are not compactly,
+             * it's not safe to use VectorizedColumnBatch directly. Currently, 
we should use {@link #next()}
+             * to handle it row by row.
+             *
+             * <p>TODO: delete this after #3883 is fixed completely.
+             */
             this.result =
-                    new ColumnarRowIterator(
-                            filePath, new ColumnarRow(columnarBatch), 
this::recycle);
+                    containsNestedColumn
+                            ? new ColumnarRowIterator(
+                                    filePath, new ColumnarRow(columnarBatch), 
this::recycle)
+                            : new VectorizedRowIterator(
+                                    filePath, new ColumnarRow(columnarBatch), 
this::recycle);
         }
 
         public void recycle() {
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetColumnVectorTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetColumnVectorTest.java
index 0d862c3963..873a3f036d 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetColumnVectorTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetColumnVectorTest.java
@@ -29,8 +29,8 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.columnar.ArrayColumnVector;
 import org.apache.paimon.data.columnar.BytesColumnVector;
 import org.apache.paimon.data.columnar.ColumnVector;
+import org.apache.paimon.data.columnar.ColumnarRowIterator;
 import org.apache.paimon.data.columnar.IntColumnVector;
-import org.apache.paimon.data.columnar.VectorizedColumnBatch;
 import org.apache.paimon.format.FormatReaderContext;
 import org.apache.paimon.format.FormatWriter;
 import org.apache.paimon.format.parquet.writer.RowDataParquetBuilder;
@@ -64,6 +64,7 @@ import java.util.function.BiFunction;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static org.apache.paimon.data.BinaryString.fromString;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Validate the {@link ColumnVector}s read by Parquet format. */
@@ -78,6 +79,25 @@ public class ParquetColumnVectorTest {
                             ? "null"
                             : new String(((BytesColumnVector) 
cv).getBytes(i).getBytes());
 
+    @Test
+    public void testNormalStrings() throws IOException {
+        RowType rowType =
+                RowType.builder()
+                        .field("s1", DataTypes.STRING())
+                        .field("s2", DataTypes.STRING())
+                        .field("s3", DataTypes.STRING())
+                        .build();
+
+        int numRows = RND.nextInt(5) + 5;
+        List<InternalRow> rows = new ArrayList<>(numRows);
+        for (int i = 0; i < numRows; i++) {
+            rows.add(GenericRow.of(fromString(i + ""), fromString(i + ""), 
fromString(i + "")));
+        }
+
+        ColumnarRowIterator iterator = createRecordIterator(rowType, rows);
+        assertThat(iterator).isInstanceOf(VectorizedRecordIterator.class);
+    }
+
     @Test
     public void testArrayString() throws IOException {
         RowType rowType =
@@ -107,8 +127,8 @@ public class ParquetColumnVectorTest {
             rows.add(GenericRow.of(array));
         }
 
-        VectorizedRecordIterator iterator = 
createVectorizedRecordIterator(rowType, rows);
-        VectorizedColumnBatch batch = iterator.batch();
+        ColumnarRowIterator iterator = createRecordIterator(rowType, rows);
+        assertThat(iterator).isNotInstanceOf(VectorizedRecordIterator.class);
         InternalArray.ElementGetter getter = 
InternalArray.createElementGetter(DataTypes.STRING());
 
         // validate row by row
@@ -175,8 +195,8 @@ public class ParquetColumnVectorTest {
             rows.add(GenericRow.of(new GenericArray(innerArrays)));
         }
 
-        VectorizedRecordIterator iterator = 
createVectorizedRecordIterator(rowType, rows);
-        VectorizedColumnBatch batch = iterator.batch();
+        ColumnarRowIterator iterator = createRecordIterator(rowType, rows);
+        assertThat(iterator).isNotInstanceOf(VectorizedRecordIterator.class);
         InternalArray.ElementGetter getter = 
InternalArray.createElementGetter(DataTypes.STRING());
 
         // validate row by row
@@ -224,13 +244,13 @@ public class ParquetColumnVectorTest {
             expectedData.add(currentStringArray);
             Map<Integer, BinaryString> map = new HashMap<>();
             for (int idx = 0; idx < currentSize; idx++) {
-                map.put(idx, 
BinaryString.fromString(currentStringArray.get(idx)));
+                map.put(idx, fromString(currentStringArray.get(idx)));
             }
             rows.add(GenericRow.of(new GenericMap(map)));
         }
 
-        VectorizedRecordIterator iterator = 
createVectorizedRecordIterator(rowType, rows);
-        VectorizedColumnBatch batch = iterator.batch();
+        ColumnarRowIterator iterator = createRecordIterator(rowType, rows);
+        assertThat(iterator).isNotInstanceOf(VectorizedRecordIterator.class);
         InternalArray.ElementGetter getter = 
InternalArray.createElementGetter(DataTypes.STRING());
 
         // validate row by row
@@ -310,8 +330,9 @@ public class ParquetColumnVectorTest {
             rows.add(GenericRow.of(new GenericMap(map)));
         }
 
-        VectorizedRecordIterator iterator = 
createVectorizedRecordIterator(rowType, rows);
-        VectorizedColumnBatch batch = iterator.batch();
+        ColumnarRowIterator iterator = createRecordIterator(rowType, rows);
+        assertThat(iterator).isNotInstanceOf(VectorizedRecordIterator.class);
+
         InternalArray.ElementGetter getter = 
InternalArray.createElementGetter(DataTypes.STRING());
 
         // validate row by row
@@ -420,8 +441,8 @@ public class ParquetColumnVectorTest {
             rows.add(GenericRow.of(GenericRow.of(i, array)));
         }
 
-        VectorizedRecordIterator iterator = 
createVectorizedRecordIterator(rowType, rows);
-        VectorizedColumnBatch batch = iterator.batch();
+        ColumnarRowIterator iterator = createRecordIterator(rowType, rows);
+        assertThat(iterator).isNotInstanceOf(VectorizedRecordIterator.class);
         InternalArray.ElementGetter getter = 
InternalArray.createElementGetter(DataTypes.STRING());
 
         // validate row by row
@@ -487,7 +508,7 @@ public class ParquetColumnVectorTest {
         List<InternalRow> rows = new ArrayList<>(4);
         List<BinaryString> f0 = new ArrayList<>(3);
         for (int i = 0; i < 3; i++) {
-            f0.add(BinaryString.fromString(randomString()));
+            f0.add(fromString(randomString()));
         }
 
         GenericRow row00 = GenericRow.of(f0.get(0), new GenericArray(new 
Object[] {0, null}));
@@ -504,8 +525,8 @@ public class ParquetColumnVectorTest {
         GenericArray array3 = new GenericArray(new GenericRow[] {});
         rows.add(GenericRow.of(array3));
 
-        VectorizedRecordIterator iterator = 
createVectorizedRecordIterator(rowType, rows);
-        VectorizedColumnBatch batch = iterator.batch();
+        ColumnarRowIterator iterator = createRecordIterator(rowType, rows);
+        assertThat(iterator).isNotInstanceOf(VectorizedRecordIterator.class);
 
         // validate row by row
         InternalRow row0 = iterator.next();
@@ -625,8 +646,9 @@ public class ParquetColumnVectorTest {
         InternalRow row3 =
                 GenericRow.of(GenericRow.of(new GenericArray(new GenericRow[] 
{null}), null));
 
-        VectorizedRecordIterator iterator =
-                createVectorizedRecordIterator(rowType, Arrays.asList(row0, 
row1, row2, row3));
+        ColumnarRowIterator iterator =
+                createRecordIterator(rowType, Arrays.asList(row0, row1, row2, 
row3));
+        assertThat(iterator).isNotInstanceOf(VectorizedRecordIterator.class);
 
         // validate column vector
         //        VectorizedColumnBatch batch = iterator.batch();
@@ -704,8 +726,8 @@ public class ParquetColumnVectorTest {
         iterator.releaseBatch();
     }
 
-    private VectorizedRecordIterator createVectorizedRecordIterator(
-            RowType rowType, List<InternalRow> rows) throws IOException {
+    private ColumnarRowIterator createRecordIterator(RowType rowType, 
List<InternalRow> rows)
+            throws IOException {
         Path path = new Path(tempDir.toString(), UUID.randomUUID().toString());
         LocalFileIO fileIO = LocalFileIO.create();
 
@@ -725,7 +747,7 @@ public class ParquetColumnVectorTest {
                         new FormatReaderContext(fileIO, path, 
fileIO.getFileSize(path)));
 
         RecordReader.RecordIterator<InternalRow> iterator = reader.readBatch();
-        return (VectorizedRecordIterator) iterator;
+        return (ColumnarRowIterator) iterator;
     }
 
     @Nullable

Reply via email to