This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new 2d324f0ab9 [core] fix: FormatReadBuilder serialization fail and partition bug (#6190) 2d324f0ab9 is described below commit 2d324f0ab9ffb7b560a50727a32a1088c1659a15 Author: jerry <lining....@alibaba-inc.com> AuthorDate: Wed Sep 3 16:55:12 2025 +0800 [core] fix: FormatReadBuilder serialization fail and partition bug (#6190) --- .../paimon/table/format/FormatReadBuilder.java | 14 +-- .../paimon/table/format/FormatTableScan.java | 2 +- .../org/apache/paimon/catalog/CatalogTestBase.java | 10 +- .../paimon/table/format/FormatReadBuilderTest.java | 125 +++++++++++++++++++++ 4 files changed, 139 insertions(+), 12 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java index 60eb8e1312..4c882ae4a6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java @@ -20,7 +20,6 @@ package org.apache.paimon.table.format; import org.apache.paimon.CoreOptions; import org.apache.paimon.data.InternalRow; -import org.apache.paimon.format.FileFormat; import org.apache.paimon.format.FileFormatDiscover; import org.apache.paimon.format.FormatReaderContext; import org.apache.paimon.format.FormatReaderFactory; @@ -56,9 +55,8 @@ public class FormatReadBuilder implements ReadBuilder { private static final long serialVersionUID = 1L; private final FormatTable table; - private final FileFormat fileFormat; - private RowType readType; + private final CoreOptions options; @Nullable private Predicate filter; @Nullable private PartitionPredicate partitionFilter; @Nullable private Integer limit; @@ -66,8 +64,7 @@ public class FormatReadBuilder implements ReadBuilder { public FormatReadBuilder(FormatTable table) { this.table = table; this.readType = this.table.rowType(); - CoreOptions options = new CoreOptions(table.options()); - this.fileFormat = FileFormatDiscover.of(options).discover(options.formatType()); + this.options = new CoreOptions(table.options()); } @Override @@ -144,10 +141,11 @@ public class FormatReadBuilder implements ReadBuilder { Path filePath = dataSplit.dataPath(); FormatReaderContext formatReaderContext = new FormatReaderContext(table.fileIO(), filePath, dataSplit.length(), null); - FormatReaderFactory readerFactory = - fileFormat.createReaderFactory( - table.rowType(), readType(), PredicateBuilder.splitAnd(filter)); + FileFormatDiscover.of(options) + .discover(options.formatType()) + .createReaderFactory( + table.rowType(), readType(), PredicateBuilder.splitAnd(filter)); Pair<int[], RowType> partitionMapping = PartitionUtils.getPartitionMapping( diff --git a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java index 3555f76836..1850bc4c5c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java @@ -142,7 +142,7 @@ public class FormatTableScan implements InnerTableScan { partition2Paths) { LinkedHashMap<String, String> partitionSpec = partition2Path.getKey(); BinaryRow partitionRow = createPartitionRow(partitionSpec); - if (partitionFilter != null && partitionFilter.test(partitionRow)) { + if (partitionFilter == null || partitionFilter.test(partitionRow)) { splits.addAll( getSplits(fileIO, partition2Path.getValue(), partitionRow)); } diff --git a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java index 192e8f8171..082d2bc638 100644 --- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java @@ -661,6 +661,7 @@ public abstract class CatalogTestBase { diffPartitionPathFactory.newPath(), compressionType.value(), dataWithDiffPartition); + size = size + 1; partitionSpec = new HashMap<>(); partitionSpec.put("dt", "" + partitionValue); } else { @@ -675,12 +676,15 @@ public abstract class CatalogTestBase { null); write(factory, dataFilePathFactory.newPath(), compressionType.value(), datas); } - List<InternalRow> readData = read(table, predicate, projection, partitionSpec, null); - Integer limit = checkSize - 1; + List<InternalRow> readFilterData = + read(table, predicate, projection, partitionSpec, null); + assertThat(readFilterData).containsExactlyInAnyOrder(checkDatas); + List<InternalRow> readAllData = read(table, null, null, null, null); + assertThat(readAllData).hasSize(size); + int limit = checkSize - 1; List<InternalRow> readLimitData = read(table, predicate, projection, partitionSpec, limit); assertThat(readLimitData).hasSize(limit); - assertThat(readData).containsExactlyInAnyOrder(checkDatas); catalog.dropTable(Identifier.create(dbName, format), true); } } diff --git a/paimon-core/src/test/java/org/apache/paimon/table/format/FormatReadBuilderTest.java b/paimon-core/src/test/java/org/apache/paimon/table/format/FormatReadBuilderTest.java new file mode 100644 index 0000000000..45cff37f3e --- /dev/null +++ b/paimon-core/src/test/java/org/apache/paimon/table/format/FormatReadBuilderTest.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.format; + +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.fs.Path; +import org.apache.paimon.fs.local.LocalFileIO; +import org.apache.paimon.predicate.Predicate; +import org.apache.paimon.predicate.PredicateBuilder; +import org.apache.paimon.table.FormatTable; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.InstantiationUtil; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatNoException; + +/** Test for {@link FormatReadBuilder} serialization. */ +public class FormatReadBuilderTest { + + @TempDir java.nio.file.Path tempPath; + + @Test + public void testSerializeAndDeserialize() throws IOException, ClassNotFoundException { + RowType rowType = + RowType.builder() + .field("partition_key", DataTypes.STRING()) + .field("id", DataTypes.BIGINT()) + .field("data", DataTypes.STRING()) + .field("timestamp", DataTypes.TIMESTAMP()) + .build(); + + Map<String, String> options = new HashMap<>(); + options.put("file.format", "orc"); + options.put("file.compression", "zstd"); + + Path tablePath = new Path(tempPath.toUri()); + FormatTable table = + FormatTable.builder() + .fileIO(LocalFileIO.create()) + .identifier(Identifier.create("test_db", "complex_table")) + .rowType(rowType) + .partitionKeys(Arrays.asList("partition_key")) + .location(tablePath.toString()) + .format(FormatTable.Format.ORC) + .options(options) + .build(); + + FormatReadBuilder readBuilder = new FormatReadBuilder(table); + + assertThatNoException().isThrownBy(() -> InstantiationUtil.serializeObject(readBuilder)); + assertThatNoException() + .isThrownBy( + () -> + InstantiationUtil.deserializeObject( + InstantiationUtil.serializeObject(readBuilder), + getClass().getClassLoader())); + + // Add multiple filters + PredicateBuilder predicateBuilder = new PredicateBuilder(rowType); + Predicate filter1 = predicateBuilder.greaterThan(1, 1000L); // id > 1000 + Predicate filter2 = predicateBuilder.isNotNull(2); // data is not null + readBuilder.withFilter(filter1); + readBuilder.withFilter(filter2); + + // Add partition filter + Map<String, String> partitionSpec = new HashMap<>(); + partitionSpec.put("partition_key", "test_partition"); + readBuilder.withPartitionFilter(partitionSpec); + + // Add projection and limit + int[] projection = {1, 2, 3}; // project id, data, timestamp + readBuilder.withProjection(projection); + readBuilder.withLimit(500); + + // Test Java serialization + byte[] serialized = InstantiationUtil.serializeObject(readBuilder); + FormatReadBuilder deserialized = + InstantiationUtil.deserializeObject(serialized, getClass().getClassLoader()); + + // Verify the deserialized object maintains all configurations + assertThat(deserialized.tableName()).isEqualTo(table.name()); + + RowType expectedProjectedType = + RowType.builder() + .field("id", DataTypes.BIGINT()) + .field("data", DataTypes.STRING()) + .field("timestamp", DataTypes.TIMESTAMP()) + .build(); + assertThat(deserialized.readType().getFieldCount()) + .isEqualTo(expectedProjectedType.getFieldCount()); + assertThat(deserialized.readType().getFieldNames()) + .isEqualTo(expectedProjectedType.getFieldNames()); + assertThat(deserialized.readType().getFieldTypes()) + .isEqualTo(expectedProjectedType.getFieldTypes()); + + // Verify that scan and read can be created + assertThat(deserialized.newScan()).isNotNull(); + assertThat(deserialized.newRead()).isNotNull(); + } +}