This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d324f0ab9 [core] fix: FormatReadBuilder serialization fail and 
partition bug (#6190)
2d324f0ab9 is described below

commit 2d324f0ab9ffb7b560a50727a32a1088c1659a15
Author: jerry <lining....@alibaba-inc.com>
AuthorDate: Wed Sep 3 16:55:12 2025 +0800

    [core] fix: FormatReadBuilder serialization fail and partition bug (#6190)
---
 .../paimon/table/format/FormatReadBuilder.java     |  14 +--
 .../paimon/table/format/FormatTableScan.java       |   2 +-
 .../org/apache/paimon/catalog/CatalogTestBase.java |  10 +-
 .../paimon/table/format/FormatReadBuilderTest.java | 125 +++++++++++++++++++++
 4 files changed, 139 insertions(+), 12 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
index 60eb8e1312..4c882ae4a6 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatReadBuilder.java
@@ -20,7 +20,6 @@ package org.apache.paimon.table.format;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.format.FileFormatDiscover;
 import org.apache.paimon.format.FormatReaderContext;
 import org.apache.paimon.format.FormatReaderFactory;
@@ -56,9 +55,8 @@ public class FormatReadBuilder implements ReadBuilder {
     private static final long serialVersionUID = 1L;
 
     private final FormatTable table;
-    private final FileFormat fileFormat;
-
     private RowType readType;
+    private final CoreOptions options;
     @Nullable private Predicate filter;
     @Nullable private PartitionPredicate partitionFilter;
     @Nullable private Integer limit;
@@ -66,8 +64,7 @@ public class FormatReadBuilder implements ReadBuilder {
     public FormatReadBuilder(FormatTable table) {
         this.table = table;
         this.readType = this.table.rowType();
-        CoreOptions options = new CoreOptions(table.options());
-        this.fileFormat = 
FileFormatDiscover.of(options).discover(options.formatType());
+        this.options = new CoreOptions(table.options());
     }
 
     @Override
@@ -144,10 +141,11 @@ public class FormatReadBuilder implements ReadBuilder {
         Path filePath = dataSplit.dataPath();
         FormatReaderContext formatReaderContext =
                 new FormatReaderContext(table.fileIO(), filePath, 
dataSplit.length(), null);
-
         FormatReaderFactory readerFactory =
-                fileFormat.createReaderFactory(
-                        table.rowType(), readType(), 
PredicateBuilder.splitAnd(filter));
+                FileFormatDiscover.of(options)
+                        .discover(options.formatType())
+                        .createReaderFactory(
+                                table.rowType(), readType(), 
PredicateBuilder.splitAnd(filter));
 
         Pair<int[], RowType> partitionMapping =
                 PartitionUtils.getPartitionMapping(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java
index 3555f76836..1850bc4c5c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableScan.java
@@ -142,7 +142,7 @@ public class FormatTableScan implements InnerTableScan {
                             partition2Paths) {
                         LinkedHashMap<String, String> partitionSpec = 
partition2Path.getKey();
                         BinaryRow partitionRow = 
createPartitionRow(partitionSpec);
-                        if (partitionFilter != null && 
partitionFilter.test(partitionRow)) {
+                        if (partitionFilter == null || 
partitionFilter.test(partitionRow)) {
                             splits.addAll(
                                     getSplits(fileIO, 
partition2Path.getValue(), partitionRow));
                         }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index 192e8f8171..082d2bc638 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -661,6 +661,7 @@ public abstract class CatalogTestBase {
                         diffPartitionPathFactory.newPath(),
                         compressionType.value(),
                         dataWithDiffPartition);
+                size = size + 1;
                 partitionSpec = new HashMap<>();
                 partitionSpec.put("dt", "" + partitionValue);
             } else {
@@ -675,12 +676,15 @@ public abstract class CatalogTestBase {
                                 null);
                 write(factory, dataFilePathFactory.newPath(), 
compressionType.value(), datas);
             }
-            List<InternalRow> readData = read(table, predicate, projection, 
partitionSpec, null);
-            Integer limit = checkSize - 1;
+            List<InternalRow> readFilterData =
+                    read(table, predicate, projection, partitionSpec, null);
+            assertThat(readFilterData).containsExactlyInAnyOrder(checkDatas);
+            List<InternalRow> readAllData = read(table, null, null, null, 
null);
+            assertThat(readAllData).hasSize(size);
+            int limit = checkSize - 1;
             List<InternalRow> readLimitData =
                     read(table, predicate, projection, partitionSpec, limit);
             assertThat(readLimitData).hasSize(limit);
-            assertThat(readData).containsExactlyInAnyOrder(checkDatas);
             catalog.dropTable(Identifier.create(dbName, format), true);
         }
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/format/FormatReadBuilderTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/format/FormatReadBuilderTest.java
new file mode 100644
index 0000000000..45cff37f3e
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/format/FormatReadBuilderTest.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.format;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.table.FormatTable;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.InstantiationUtil;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatNoException;
+
+/** Test for {@link FormatReadBuilder} serialization. */
+public class FormatReadBuilderTest {
+
+    @TempDir java.nio.file.Path tempPath;
+
+    @Test
+    public void testSerializeAndDeserialize() throws IOException, 
ClassNotFoundException {
+        RowType rowType =
+                RowType.builder()
+                        .field("partition_key", DataTypes.STRING())
+                        .field("id", DataTypes.BIGINT())
+                        .field("data", DataTypes.STRING())
+                        .field("timestamp", DataTypes.TIMESTAMP())
+                        .build();
+
+        Map<String, String> options = new HashMap<>();
+        options.put("file.format", "orc");
+        options.put("file.compression", "zstd");
+
+        Path tablePath = new Path(tempPath.toUri());
+        FormatTable table =
+                FormatTable.builder()
+                        .fileIO(LocalFileIO.create())
+                        .identifier(Identifier.create("test_db", 
"complex_table"))
+                        .rowType(rowType)
+                        .partitionKeys(Arrays.asList("partition_key"))
+                        .location(tablePath.toString())
+                        .format(FormatTable.Format.ORC)
+                        .options(options)
+                        .build();
+
+        FormatReadBuilder readBuilder = new FormatReadBuilder(table);
+
+        assertThatNoException().isThrownBy(() -> 
InstantiationUtil.serializeObject(readBuilder));
+        assertThatNoException()
+                .isThrownBy(
+                        () ->
+                                InstantiationUtil.deserializeObject(
+                                        
InstantiationUtil.serializeObject(readBuilder),
+                                        getClass().getClassLoader()));
+
+        // Add multiple filters
+        PredicateBuilder predicateBuilder = new PredicateBuilder(rowType);
+        Predicate filter1 = predicateBuilder.greaterThan(1, 1000L); // id > 
1000
+        Predicate filter2 = predicateBuilder.isNotNull(2); // data is not null
+        readBuilder.withFilter(filter1);
+        readBuilder.withFilter(filter2);
+
+        // Add partition filter
+        Map<String, String> partitionSpec = new HashMap<>();
+        partitionSpec.put("partition_key", "test_partition");
+        readBuilder.withPartitionFilter(partitionSpec);
+
+        // Add projection and limit
+        int[] projection = {1, 2, 3}; // project id, data, timestamp
+        readBuilder.withProjection(projection);
+        readBuilder.withLimit(500);
+
+        // Test Java serialization
+        byte[] serialized = InstantiationUtil.serializeObject(readBuilder);
+        FormatReadBuilder deserialized =
+                InstantiationUtil.deserializeObject(serialized, 
getClass().getClassLoader());
+
+        // Verify the deserialized object maintains all configurations
+        assertThat(deserialized.tableName()).isEqualTo(table.name());
+
+        RowType expectedProjectedType =
+                RowType.builder()
+                        .field("id", DataTypes.BIGINT())
+                        .field("data", DataTypes.STRING())
+                        .field("timestamp", DataTypes.TIMESTAMP())
+                        .build();
+        assertThat(deserialized.readType().getFieldCount())
+                .isEqualTo(expectedProjectedType.getFieldCount());
+        assertThat(deserialized.readType().getFieldNames())
+                .isEqualTo(expectedProjectedType.getFieldNames());
+        assertThat(deserialized.readType().getFieldTypes())
+                .isEqualTo(expectedProjectedType.getFieldTypes());
+
+        // Verify that scan and read can be created
+        assertThat(deserialized.newScan()).isNotNull();
+        assertThat(deserialized.newRead()).isNotNull();
+    }
+}

Reply via email to