This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 1fd81f2ae3 [parquet] parquet reader should not retrun
VectorizedRowIterator for nested schema (#4749)
1fd81f2ae3 is described below
commit 1fd81f2ae37b6ccdc4392412b0590ff8488d4498
Author: Jingsong Lee <[email protected]>
AuthorDate: Sat Dec 21 21:49:22 2024 +0800
[parquet] parquet reader should not retrun VectorizedRowIterator for nested
schema (#4749)
---
.../paimon/data/columnar/ColumnarRowIterator.java | 22 +++-----
.../data/columnar/VectorizedColumnBatch.java | 6 ---
.../data/columnar/VectorizedRowIterator.java | 45 ++++++++++++++++
.../apache/paimon/format/orc/OrcReaderFactory.java | 5 +-
.../format/parquet/ParquetReaderFactory.java | 30 +++++++++--
.../format/parquet/ParquetColumnVectorTest.java | 62 +++++++++++++++-------
6 files changed, 125 insertions(+), 45 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
index 874c221348..faecd1ccd8 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/ColumnarRowIterator.java
@@ -23,7 +23,6 @@ import org.apache.paimon.data.PartitionInfo;
import org.apache.paimon.fs.Path;
import org.apache.paimon.reader.FileRecordIterator;
import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.reader.VectorizedRecordIterator;
import org.apache.paimon.utils.RecyclableIterator;
import org.apache.paimon.utils.VectorMappingUtils;
@@ -34,15 +33,15 @@ import javax.annotation.Nullable;
* {@link ColumnarRow#setRowId}.
*/
public class ColumnarRowIterator extends RecyclableIterator<InternalRow>
- implements FileRecordIterator<InternalRow>, VectorizedRecordIterator {
+ implements FileRecordIterator<InternalRow> {
- private final Path filePath;
- private final ColumnarRow row;
- private final Runnable recycler;
+ protected final Path filePath;
+ protected final ColumnarRow row;
+ protected final Runnable recycler;
- private int num;
- private int nextPos;
- private long nextFilePos;
+ protected int num;
+ protected int nextPos;
+ protected long nextFilePos;
public ColumnarRowIterator(Path filePath, ColumnarRow row, @Nullable
Runnable recycler) {
super(recycler);
@@ -79,7 +78,7 @@ public class ColumnarRowIterator extends
RecyclableIterator<InternalRow>
return this.filePath;
}
- public ColumnarRowIterator copy(ColumnVector[] vectors) {
+ protected ColumnarRowIterator copy(ColumnVector[] vectors) {
ColumnarRowIterator newIterator =
new ColumnarRowIterator(filePath, row.copy(vectors), recycler);
newIterator.reset(nextFilePos);
@@ -101,9 +100,4 @@ public class ColumnarRowIterator extends
RecyclableIterator<InternalRow>
}
return this;
}
-
- @Override
- public VectorizedColumnBatch batch() {
- return row.batch();
- }
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedColumnBatch.java
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedColumnBatch.java
index dab5356435..4cf5f4c7c2 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedColumnBatch.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedColumnBatch.java
@@ -38,12 +38,6 @@ public class VectorizedColumnBatch implements Serializable {
private static final long serialVersionUID = 8180323238728166155L;
- /**
- * This number is carefully chosen to minimize overhead and typically
allows one
- * VectorizedColumnBatch to fit in cache.
- */
- public static final int DEFAULT_SIZE = 2048;
-
private int numRows;
public final ColumnVector[] columns;
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedRowIterator.java
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedRowIterator.java
new file mode 100644
index 0000000000..889da334c5
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/data/columnar/VectorizedRowIterator.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.data.columnar;
+
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.reader.VectorizedRecordIterator;
+
+import javax.annotation.Nullable;
+
+/** A {@link ColumnarRowIterator} with {@link VectorizedRecordIterator}. */
+public class VectorizedRowIterator extends ColumnarRowIterator implements
VectorizedRecordIterator {
+
+ public VectorizedRowIterator(Path filePath, ColumnarRow row, @Nullable
Runnable recycler) {
+ super(filePath, row, recycler);
+ }
+
+ @Override
+ public VectorizedColumnBatch batch() {
+ return row.batch();
+ }
+
+ @Override
+ protected VectorizedRowIterator copy(ColumnVector[] vectors) {
+ VectorizedRowIterator newIterator =
+ new VectorizedRowIterator(filePath, row.copy(vectors),
recycler);
+ newIterator.reset(nextFilePos);
+ return newIterator;
+ }
+}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
index db17357bfd..6683b357fd 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/orc/OrcReaderFactory.java
@@ -23,6 +23,7 @@ import org.apache.paimon.data.columnar.ColumnVector;
import org.apache.paimon.data.columnar.ColumnarRow;
import org.apache.paimon.data.columnar.ColumnarRowIterator;
import org.apache.paimon.data.columnar.VectorizedColumnBatch;
+import org.apache.paimon.data.columnar.VectorizedRowIterator;
import org.apache.paimon.fileindex.FileIndexResult;
import org.apache.paimon.fileindex.bitmap.BitmapIndexResult;
import org.apache.paimon.format.FormatReaderFactory;
@@ -158,7 +159,7 @@ public class OrcReaderFactory implements
FormatReaderFactory {
private final Pool.Recycler<OrcReaderBatch> recycler;
private final VectorizedColumnBatch paimonColumnBatch;
- private final ColumnarRowIterator result;
+ private final VectorizedRowIterator result;
protected OrcReaderBatch(
final Path filePath,
@@ -169,7 +170,7 @@ public class OrcReaderFactory implements
FormatReaderFactory {
this.recycler = checkNotNull(recycler);
this.paimonColumnBatch = paimonColumnBatch;
this.result =
- new ColumnarRowIterator(
+ new VectorizedRowIterator(
filePath, new ColumnarRow(paimonColumnBatch),
this::recycle);
}
diff --git
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
index 910f3031e0..9fef756371 100644
---
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
+++
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
@@ -19,10 +19,14 @@
package org.apache.paimon.format.parquet;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.columnar.ArrayColumnVector;
import org.apache.paimon.data.columnar.ColumnVector;
import org.apache.paimon.data.columnar.ColumnarRow;
import org.apache.paimon.data.columnar.ColumnarRowIterator;
+import org.apache.paimon.data.columnar.MapColumnVector;
+import org.apache.paimon.data.columnar.RowColumnVector;
import org.apache.paimon.data.columnar.VectorizedColumnBatch;
+import org.apache.paimon.data.columnar.VectorizedRowIterator;
import org.apache.paimon.data.columnar.heap.ElementCountable;
import org.apache.paimon.data.columnar.writable.WritableColumnVector;
import org.apache.paimon.format.FormatReaderFactory;
@@ -525,7 +529,8 @@ public class ParquetReaderFactory implements
FormatReaderFactory {
private static class ParquetReaderBatch {
private final WritableColumnVector[] writableVectors;
- protected final VectorizedColumnBatch columnarBatch;
+ private final boolean containsNestedColumn;
+ private final VectorizedColumnBatch columnarBatch;
private final Pool.Recycler<ParquetReaderBatch> recycler;
private final ColumnarRowIterator result;
@@ -536,11 +541,30 @@ public class ParquetReaderFactory implements
FormatReaderFactory {
VectorizedColumnBatch columnarBatch,
Pool.Recycler<ParquetReaderBatch> recycler) {
this.writableVectors = writableVectors;
+ this.containsNestedColumn =
+ Arrays.stream(writableVectors)
+ .anyMatch(
+ vector ->
+ vector instanceof MapColumnVector
+ || vector instanceof
RowColumnVector
+ || vector instanceof
ArrayColumnVector);
this.columnarBatch = columnarBatch;
this.recycler = recycler;
+
+ /*
+ * See <a
href="https://github.com/apache/paimon/pull/3883">Reverted #3883</a>.
+ * If a FileRecordIterator contains a batch and the batch's nested
vectors are not compactly,
+ * it's not safe to use VectorizedColumnBatch directly. Currently,
we should use {@link #next()}
+ * to handle it row by row.
+ *
+ * <p>TODO: delete this after #3883 is fixed completely.
+ */
this.result =
- new ColumnarRowIterator(
- filePath, new ColumnarRow(columnarBatch),
this::recycle);
+ containsNestedColumn
+ ? new ColumnarRowIterator(
+ filePath, new ColumnarRow(columnarBatch),
this::recycle)
+ : new VectorizedRowIterator(
+ filePath, new ColumnarRow(columnarBatch),
this::recycle);
}
public void recycle() {
diff --git
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetColumnVectorTest.java
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetColumnVectorTest.java
index 0d862c3963..873a3f036d 100644
---
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetColumnVectorTest.java
+++
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/ParquetColumnVectorTest.java
@@ -29,8 +29,8 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.columnar.ArrayColumnVector;
import org.apache.paimon.data.columnar.BytesColumnVector;
import org.apache.paimon.data.columnar.ColumnVector;
+import org.apache.paimon.data.columnar.ColumnarRowIterator;
import org.apache.paimon.data.columnar.IntColumnVector;
-import org.apache.paimon.data.columnar.VectorizedColumnBatch;
import org.apache.paimon.format.FormatReaderContext;
import org.apache.paimon.format.FormatWriter;
import org.apache.paimon.format.parquet.writer.RowDataParquetBuilder;
@@ -64,6 +64,7 @@ import java.util.function.BiFunction;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static org.apache.paimon.data.BinaryString.fromString;
import static org.assertj.core.api.Assertions.assertThat;
/** Validate the {@link ColumnVector}s read by Parquet format. */
@@ -78,6 +79,25 @@ public class ParquetColumnVectorTest {
? "null"
: new String(((BytesColumnVector)
cv).getBytes(i).getBytes());
+ @Test
+ public void testNormalStrings() throws IOException {
+ RowType rowType =
+ RowType.builder()
+ .field("s1", DataTypes.STRING())
+ .field("s2", DataTypes.STRING())
+ .field("s3", DataTypes.STRING())
+ .build();
+
+ int numRows = RND.nextInt(5) + 5;
+ List<InternalRow> rows = new ArrayList<>(numRows);
+ for (int i = 0; i < numRows; i++) {
+ rows.add(GenericRow.of(fromString(i + ""), fromString(i + ""),
fromString(i + "")));
+ }
+
+ ColumnarRowIterator iterator = createRecordIterator(rowType, rows);
+ assertThat(iterator).isInstanceOf(VectorizedRecordIterator.class);
+ }
+
@Test
public void testArrayString() throws IOException {
RowType rowType =
@@ -107,8 +127,8 @@ public class ParquetColumnVectorTest {
rows.add(GenericRow.of(array));
}
- VectorizedRecordIterator iterator =
createVectorizedRecordIterator(rowType, rows);
- VectorizedColumnBatch batch = iterator.batch();
+ ColumnarRowIterator iterator = createRecordIterator(rowType, rows);
+ assertThat(iterator).isNotInstanceOf(VectorizedRecordIterator.class);
InternalArray.ElementGetter getter =
InternalArray.createElementGetter(DataTypes.STRING());
// validate row by row
@@ -175,8 +195,8 @@ public class ParquetColumnVectorTest {
rows.add(GenericRow.of(new GenericArray(innerArrays)));
}
- VectorizedRecordIterator iterator =
createVectorizedRecordIterator(rowType, rows);
- VectorizedColumnBatch batch = iterator.batch();
+ ColumnarRowIterator iterator = createRecordIterator(rowType, rows);
+ assertThat(iterator).isNotInstanceOf(VectorizedRecordIterator.class);
InternalArray.ElementGetter getter =
InternalArray.createElementGetter(DataTypes.STRING());
// validate row by row
@@ -224,13 +244,13 @@ public class ParquetColumnVectorTest {
expectedData.add(currentStringArray);
Map<Integer, BinaryString> map = new HashMap<>();
for (int idx = 0; idx < currentSize; idx++) {
- map.put(idx,
BinaryString.fromString(currentStringArray.get(idx)));
+ map.put(idx, fromString(currentStringArray.get(idx)));
}
rows.add(GenericRow.of(new GenericMap(map)));
}
- VectorizedRecordIterator iterator =
createVectorizedRecordIterator(rowType, rows);
- VectorizedColumnBatch batch = iterator.batch();
+ ColumnarRowIterator iterator = createRecordIterator(rowType, rows);
+ assertThat(iterator).isNotInstanceOf(VectorizedRecordIterator.class);
InternalArray.ElementGetter getter =
InternalArray.createElementGetter(DataTypes.STRING());
// validate row by row
@@ -310,8 +330,9 @@ public class ParquetColumnVectorTest {
rows.add(GenericRow.of(new GenericMap(map)));
}
- VectorizedRecordIterator iterator =
createVectorizedRecordIterator(rowType, rows);
- VectorizedColumnBatch batch = iterator.batch();
+ ColumnarRowIterator iterator = createRecordIterator(rowType, rows);
+ assertThat(iterator).isNotInstanceOf(VectorizedRecordIterator.class);
+
InternalArray.ElementGetter getter =
InternalArray.createElementGetter(DataTypes.STRING());
// validate row by row
@@ -420,8 +441,8 @@ public class ParquetColumnVectorTest {
rows.add(GenericRow.of(GenericRow.of(i, array)));
}
- VectorizedRecordIterator iterator =
createVectorizedRecordIterator(rowType, rows);
- VectorizedColumnBatch batch = iterator.batch();
+ ColumnarRowIterator iterator = createRecordIterator(rowType, rows);
+ assertThat(iterator).isNotInstanceOf(VectorizedRecordIterator.class);
InternalArray.ElementGetter getter =
InternalArray.createElementGetter(DataTypes.STRING());
// validate row by row
@@ -487,7 +508,7 @@ public class ParquetColumnVectorTest {
List<InternalRow> rows = new ArrayList<>(4);
List<BinaryString> f0 = new ArrayList<>(3);
for (int i = 0; i < 3; i++) {
- f0.add(BinaryString.fromString(randomString()));
+ f0.add(fromString(randomString()));
}
GenericRow row00 = GenericRow.of(f0.get(0), new GenericArray(new
Object[] {0, null}));
@@ -504,8 +525,8 @@ public class ParquetColumnVectorTest {
GenericArray array3 = new GenericArray(new GenericRow[] {});
rows.add(GenericRow.of(array3));
- VectorizedRecordIterator iterator =
createVectorizedRecordIterator(rowType, rows);
- VectorizedColumnBatch batch = iterator.batch();
+ ColumnarRowIterator iterator = createRecordIterator(rowType, rows);
+ assertThat(iterator).isNotInstanceOf(VectorizedRecordIterator.class);
// validate row by row
InternalRow row0 = iterator.next();
@@ -625,8 +646,9 @@ public class ParquetColumnVectorTest {
InternalRow row3 =
GenericRow.of(GenericRow.of(new GenericArray(new GenericRow[]
{null}), null));
- VectorizedRecordIterator iterator =
- createVectorizedRecordIterator(rowType, Arrays.asList(row0,
row1, row2, row3));
+ ColumnarRowIterator iterator =
+ createRecordIterator(rowType, Arrays.asList(row0, row1, row2,
row3));
+ assertThat(iterator).isNotInstanceOf(VectorizedRecordIterator.class);
// validate column vector
// VectorizedColumnBatch batch = iterator.batch();
@@ -704,8 +726,8 @@ public class ParquetColumnVectorTest {
iterator.releaseBatch();
}
- private VectorizedRecordIterator createVectorizedRecordIterator(
- RowType rowType, List<InternalRow> rows) throws IOException {
+ private ColumnarRowIterator createRecordIterator(RowType rowType,
List<InternalRow> rows)
+ throws IOException {
Path path = new Path(tempDir.toString(), UUID.randomUUID().toString());
LocalFileIO fileIO = LocalFileIO.create();
@@ -725,7 +747,7 @@ public class ParquetColumnVectorTest {
new FormatReaderContext(fileIO, path,
fileIO.getFileSize(path)));
RecordReader.RecordIterator<InternalRow> iterator = reader.readBatch();
- return (VectorizedRecordIterator) iterator;
+ return (ColumnarRowIterator) iterator;
}
@Nullable