This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new aac417c1a1 [flink] Fix Flink Lookup Join for Postpone bucket table 
(#5537)
aac417c1a1 is described below

commit aac417c1a149bafdc182114fba5ab20770d36c17
Author: Jingsong Lee <jingsongl...@gmail.com>
AuthorDate: Fri Apr 25 21:28:13 2025 +0800

    [flink] Fix Flink Lookup Join for Postpone bucket table (#5537)
---
 .../apache/paimon/flink/action/CompactAction.java  |   2 +-
 .../flink/lookup/FixedBucketFromPkExtractor.java   | 110 ------------------
 .../flink/lookup/PrimaryKeyPartialLookupTable.java | 100 ++++++++++++++---
 .../paimon/flink/source/BaseDataTableSource.java   |   6 +-
 .../paimon/flink/PostponeBucketTableITCase.java    | 125 +++++++++++++++++++++
 .../flink/lookup/FileStoreLookupFunctionTest.java  |  23 ++++
 6 files changed, 241 insertions(+), 125 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index 306216a57d..0244a20547 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -259,7 +259,7 @@ public class CompactAction extends TableActionBase {
         InternalRowPartitionComputer partitionComputer =
                 new InternalRowPartitionComputer(
                         fileStoreTable.coreOptions().partitionDefaultName(),
-                        fileStoreTable.rowType(),
+                        fileStoreTable.store().partitionType(),
                         fileStoreTable.partitionKeys().toArray(new String[0]),
                         fileStoreTable.coreOptions().legacyPartitionName());
         String commitUser = CoreOptions.createCommitUser(options);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FixedBucketFromPkExtractor.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FixedBucketFromPkExtractor.java
deleted file mode 100644
index 61617988a6..0000000000
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FixedBucketFromPkExtractor.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.lookup;
-
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.codegen.CodeGenUtils;
-import org.apache.paimon.codegen.Projection;
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.schema.TableSchema;
-import org.apache.paimon.table.sink.KeyAndBucketExtractor;
-
-import static org.apache.paimon.utils.Preconditions.checkArgument;
-
-/** Extractor to extract bucket from the primary key. */
-public class FixedBucketFromPkExtractor implements 
KeyAndBucketExtractor<InternalRow> {
-
-    private transient InternalRow primaryKey;
-
-    private final boolean sameBucketKeyAndTrimmedPrimaryKey;
-
-    private final int numBuckets;
-
-    private final Projection bucketKeyProjection;
-
-    private final Projection trimmedPrimaryKeyProjection;
-
-    private final Projection partitionProjection;
-
-    private final Projection logPrimaryKeyProjection;
-
-    public FixedBucketFromPkExtractor(TableSchema schema) {
-        this.numBuckets = new CoreOptions(schema.options()).bucket();
-        checkArgument(numBuckets > 0, "Num bucket is illegal: " + numBuckets);
-        this.sameBucketKeyAndTrimmedPrimaryKey =
-                schema.bucketKeys().equals(schema.trimmedPrimaryKeys());
-        this.bucketKeyProjection =
-                CodeGenUtils.newProjection(
-                        schema.logicalPrimaryKeysType(),
-                        schema.bucketKeys().stream()
-                                .mapToInt(schema.primaryKeys()::indexOf)
-                                .toArray());
-        this.trimmedPrimaryKeyProjection =
-                CodeGenUtils.newProjection(
-                        schema.logicalPrimaryKeysType(),
-                        schema.trimmedPrimaryKeys().stream()
-                                .mapToInt(schema.primaryKeys()::indexOf)
-                                .toArray());
-        this.partitionProjection =
-                CodeGenUtils.newProjection(
-                        schema.logicalPrimaryKeysType(),
-                        schema.partitionKeys().stream()
-                                .mapToInt(schema.primaryKeys()::indexOf)
-                                .toArray());
-        this.logPrimaryKeyProjection =
-                CodeGenUtils.newProjection(
-                        schema.logicalRowType(), 
schema.projection(schema.primaryKeys()));
-    }
-
-    @Override
-    public void setRecord(InternalRow record) {
-        this.primaryKey = record;
-    }
-
-    @Override
-    public BinaryRow partition() {
-        return partitionProjection.apply(primaryKey);
-    }
-
-    private BinaryRow bucketKey() {
-        if (sameBucketKeyAndTrimmedPrimaryKey) {
-            return trimmedPrimaryKey();
-        }
-
-        return bucketKeyProjection.apply(primaryKey);
-    }
-
-    @Override
-    public int bucket() {
-        BinaryRow bucketKey = bucketKey();
-        return KeyAndBucketExtractor.bucket(
-                KeyAndBucketExtractor.bucketKeyHashCode(bucketKey), 
numBuckets);
-    }
-
-    @Override
-    public BinaryRow trimmedPrimaryKey() {
-        return trimmedPrimaryKeyProjection.apply(primaryKey);
-    }
-
-    @Override
-    public BinaryRow logPrimaryKey() {
-        return logPrimaryKeyProjection.apply(primaryKey);
-    }
-}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
index 255351767c..24fd87a213 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/PrimaryKeyPartialLookupTable.java
@@ -19,15 +19,19 @@
 package org.apache.paimon.flink.lookup;
 
 import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.Projection;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.disk.IOManagerImpl;
 import org.apache.paimon.flink.query.RemoteTableQuery;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.query.LocalTableQuery;
+import org.apache.paimon.table.sink.KeyAndBucketExtractor;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.Split;
 import org.apache.paimon.table.source.StreamTableScan;
@@ -43,14 +47,18 @@ import java.io.Closeable;
 import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
+import static org.apache.paimon.table.BucketMode.POSTPONE_BUCKET;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
 /** Lookup table for primary key which supports to read the LSM tree directly. 
*/
 public class PrimaryKeyPartialLookupTable implements LookupTable {
 
     private final QueryExecutorFactory executorFactory;
-    private final FixedBucketFromPkExtractor extractor;
     @Nullable private final ProjectedRow keyRearrange;
     @Nullable private final ProjectedRow trimmedKeyRearrange;
 
@@ -58,16 +66,30 @@ public class PrimaryKeyPartialLookupTable implements 
LookupTable {
     @Nullable private Filter<InternalRow> cacheRowFilter;
     private QueryExecutor queryExecutor;
 
+    private final Projection partitionFromPk;
+    private final Projection bucketKeyFromPk;
+
     private PrimaryKeyPartialLookupTable(
             QueryExecutorFactory executorFactory, FileStoreTable table, 
List<String> joinKey) {
         this.executorFactory = executorFactory;
-
         if (table.bucketMode() != BucketMode.HASH_FIXED) {
             throw new UnsupportedOperationException(
                     "Unsupported mode for partial lookup: " + 
table.bucketMode());
         }
 
-        this.extractor = new FixedBucketFromPkExtractor(table.schema());
+        TableSchema schema = table.schema();
+        this.partitionFromPk =
+                CodeGenUtils.newProjection(
+                        schema.logicalPrimaryKeysType(),
+                        schema.partitionKeys().stream()
+                                .mapToInt(schema.primaryKeys()::indexOf)
+                                .toArray());
+        this.bucketKeyFromPk =
+                CodeGenUtils.newProjection(
+                        schema.logicalPrimaryKeysType(),
+                        schema.bucketKeys().stream()
+                                .mapToInt(schema.primaryKeys()::indexOf)
+                                .toArray());
 
         ProjectedRow keyRearrange = null;
         if (!table.primaryKeys().equals(joinKey)) {
@@ -80,7 +102,7 @@ public class PrimaryKeyPartialLookupTable implements 
LookupTable {
         }
         this.keyRearrange = keyRearrange;
 
-        List<String> trimmedPrimaryKeys = table.schema().trimmedPrimaryKeys();
+        List<String> trimmedPrimaryKeys = schema.trimmedPrimaryKeys();
         ProjectedRow trimmedKeyRearrange = null;
         if (!trimmedPrimaryKeys.equals(joinKey)) {
             trimmedKeyRearrange =
@@ -115,9 +137,14 @@ public class PrimaryKeyPartialLookupTable implements 
LookupTable {
         if (keyRearrange != null) {
             adjustedKey = keyRearrange.replaceRow(adjustedKey);
         }
-        extractor.setRecord(adjustedKey);
-        int bucket = extractor.bucket();
-        BinaryRow partition = extractor.partition();
+
+        BinaryRow partition = partitionFromPk.apply(adjustedKey);
+        Integer numBuckets = queryExecutor.numBuckets(partition);
+        if (numBuckets == null) {
+            // no data, just return none
+            return Collections.emptyList();
+        }
+        int bucket = bucket(numBuckets, adjustedKey);
 
         InternalRow trimmedKey = key;
         if (trimmedKeyRearrange != null) {
@@ -132,6 +159,12 @@ public class PrimaryKeyPartialLookupTable implements 
LookupTable {
         }
     }
 
+    private int bucket(int numBuckets, InternalRow primaryKey) {
+        BinaryRow bucketKey = bucketKeyFromPk.apply(primaryKey);
+        return KeyAndBucketExtractor.bucket(
+                KeyAndBucketExtractor.bucketKeyHashCode(bucketKey), 
numBuckets);
+    }
+
     @Override
     public void refresh() {
         queryExecutor.refresh();
@@ -182,6 +215,9 @@ public class PrimaryKeyPartialLookupTable implements 
LookupTable {
 
     interface QueryExecutor extends Closeable {
 
+        @Nullable
+        Integer numBuckets(BinaryRow partition);
+
         InternalRow lookup(BinaryRow partition, int bucket, InternalRow key) 
throws IOException;
 
         void refresh();
@@ -195,6 +231,9 @@ public class PrimaryKeyPartialLookupTable implements 
LookupTable {
         private final StreamTableScan scan;
         private final String tableName;
 
+        private final Integer defaultNumBuckets;
+        private final Map<BinaryRow, Integer> numBuckets;
+
         private LocalQueryExecutor(
                 FileStoreTable table,
                 int[] projection,
@@ -222,6 +261,14 @@ public class PrimaryKeyPartialLookupTable implements 
LookupTable {
                             .newStreamScan();
 
             this.tableName = table.name();
+            this.defaultNumBuckets = table.bucketSpec().getNumBuckets();
+            this.numBuckets = new HashMap<>();
+        }
+
+        @Override
+        @Nullable
+        public Integer numBuckets(BinaryRow partition) {
+            return numBuckets.get(partition);
         }
 
         @Override
@@ -241,16 +288,30 @@ public class PrimaryKeyPartialLookupTable implements 
LookupTable {
                 }
 
                 for (Split split : splits) {
-                    BinaryRow partition = ((DataSplit) split).partition();
-                    int bucket = ((DataSplit) split).bucket();
-                    List<DataFileMeta> before = ((DataSplit) 
split).beforeFiles();
-                    List<DataFileMeta> after = ((DataSplit) split).dataFiles();
-
-                    tableQuery.refreshFiles(partition, bucket, before, after);
+                    refreshSplit((DataSplit) split);
                 }
             }
         }
 
+        @VisibleForTesting
+        void refreshSplit(DataSplit split) {
+            BinaryRow partition = split.partition();
+            int bucket = split.bucket();
+            List<DataFileMeta> before = split.beforeFiles();
+            List<DataFileMeta> after = split.dataFiles();
+
+            tableQuery.refreshFiles(partition, bucket, before, after);
+            Integer totalBuckets = split.totalBuckets();
+            if (totalBuckets == null) {
+                // Just for compatibility with older versions
+                checkArgument(
+                        defaultNumBuckets > 0,
+                        "This is a bug, old version table numBuckets should be 
greater than 0.");
+                totalBuckets = defaultNumBuckets;
+            }
+            numBuckets.put(partition, totalBuckets);
+        }
+
         @Override
         public void close() throws IOException {
             tableQuery.close();
@@ -273,9 +334,22 @@ public class PrimaryKeyPartialLookupTable implements 
LookupTable {
     static class RemoteQueryExecutor implements QueryExecutor {
 
         private final RemoteTableQuery tableQuery;
+        private final Integer numBuckets;
 
         private RemoteQueryExecutor(FileStoreTable table, int[] projection) {
             this.tableQuery = new 
RemoteTableQuery(table).withValueProjection(projection);
+            int numBuckets = table.bucketSpec().getNumBuckets();
+            if (numBuckets == POSTPONE_BUCKET) {
+                throw new UnsupportedOperationException(
+                        "Remote query does not support POSTPONE_BUCKET.");
+            }
+            this.numBuckets = numBuckets;
+        }
+
+        @Override
+        @Nullable
+        public Integer numBuckets(BinaryRow partition) {
+            return numBuckets;
         }
 
         @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
index 81cced9a74..ca80129a51 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
@@ -38,6 +38,7 @@ import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.BucketSpec;
 import org.apache.paimon.table.DataTable;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
@@ -86,6 +87,7 @@ import static 
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGN
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_ALIGNMENT_UPDATE_INTERVAL;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_EMIT_STRATEGY;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.SCAN_WATERMARK_IDLE_TIMEOUT;
+import static org.apache.paimon.table.BucketMode.POSTPONE_BUCKET;
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
 /**
@@ -409,7 +411,9 @@ public abstract class BaseDataTableSource extends 
FlinkTableSource
 
     private boolean supportBucketShufflePartitioner(
             List<String> joinKeyFieldNames, List<String> bucketKeyFieldNames) {
-        return BucketMode.HASH_FIXED.equals(((FileStoreTable) 
table).bucketMode())
+        BucketSpec bucketSpec = ((FileStoreTable) table).bucketSpec();
+        return bucketSpec.getBucketMode() == BucketMode.HASH_FIXED
+                && bucketSpec.getNumBuckets() != POSTPONE_BUCKET
                 && new 
HashSet<>(joinKeyFieldNames).containsAll(bucketKeyFieldNames);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
index 3519f162d0..db87abc7ad 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PostponeBucketTableITCase.java
@@ -425,6 +425,131 @@ public class PostponeBucketTableITCase extends 
AbstractTestBase {
                 .containsExactlyInAnyOrder("+I[5]");
     }
 
+    @Timeout(TIMEOUT)
+    @Test
+    public void testLookupPostponeBucketTable() throws Exception {
+        String warehouse = getTempDirPath();
+        TableEnvironment bEnv =
+                tableEnvironmentBuilder()
+                        .batchMode()
+                        .setConf(TableConfigOptions.TABLE_DML_SYNC, true)
+                        .build();
+
+        String createCatalogSql =
+                "CREATE CATALOG mycat WITH (\n"
+                        + "  'type' = 'paimon',\n"
+                        + "  'warehouse' = '"
+                        + warehouse
+                        + "'\n"
+                        + ")";
+        bEnv.executeSql(createCatalogSql);
+        bEnv.executeSql("USE CATALOG mycat");
+        bEnv.executeSql(
+                "CREATE TABLE T (\n"
+                        + "  k INT,\n"
+                        + "  v INT,\n"
+                        + "  PRIMARY KEY (k) NOT ENFORCED\n"
+                        + ") WITH (\n"
+                        + "  'bucket' = '-2'\n"
+                        + ")");
+        bEnv.executeSql("CREATE TABLE SRC (i INT, `proctime` AS PROCTIME())");
+
+        TableEnvironment sEnv =
+                tableEnvironmentBuilder()
+                        .streamingMode()
+                        .parallelism(1)
+                        .checkpointIntervalMs(200)
+                        .build();
+        sEnv.executeSql(createCatalogSql);
+        sEnv.executeSql("USE CATALOG mycat");
+        TableResult streamingSelect =
+                sEnv.executeSql(
+                        "SELECT i, v FROM SRC LEFT JOIN T "
+                                + "FOR SYSTEM_TIME AS OF SRC.proctime AS D ON 
SRC.i = D.k");
+
+        JobClient client = streamingSelect.getJobClient().get();
+        CloseableIterator<Row> it = streamingSelect.collect();
+
+        bEnv.executeSql("INSERT INTO T VALUES (1, 10), (2, 20), (3, 
30)").await();
+        bEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
+
+        // lookup join
+        bEnv.executeSql("INSERT INTO SRC VALUES (1), (2), (3)").await();
+        assertThat(collect(client, it, 3))
+                .containsExactlyInAnyOrder("+I[1, 10]", "+I[2, 20]", "+I[3, 
30]");
+
+        // rescale and re-join
+        bEnv.executeSql("CALL sys.rescale(`table` => 'default.T', `bucket_num` 
=> 5)").await();
+        bEnv.executeSql("INSERT INTO SRC VALUES (1), (2), (3)").await();
+        assertThat(collect(client, it, 3))
+                .containsExactlyInAnyOrder("+I[1, 10]", "+I[2, 20]", "+I[3, 
30]");
+
+        it.close();
+    }
+
+    @Timeout(TIMEOUT)
+    @Test
+    public void testLookupPostponeBucketPartitionedTable() throws Exception {
+        String warehouse = getTempDirPath();
+        TableEnvironment bEnv =
+                tableEnvironmentBuilder()
+                        .batchMode()
+                        .setConf(TableConfigOptions.TABLE_DML_SYNC, true)
+                        .build();
+
+        String createCatalogSql =
+                "CREATE CATALOG mycat WITH (\n"
+                        + "  'type' = 'paimon',\n"
+                        + "  'warehouse' = '"
+                        + warehouse
+                        + "'\n"
+                        + ")";
+        bEnv.executeSql(createCatalogSql);
+        bEnv.executeSql("USE CATALOG mycat");
+        bEnv.executeSql(
+                "CREATE TABLE T (\n"
+                        + "  k INT,\n"
+                        + "  pt INT,\n"
+                        + "  v INT,\n"
+                        + "  PRIMARY KEY (k, pt) NOT ENFORCED\n"
+                        + ") PARTITIONED BY (pt) WITH (\n"
+                        + "  'bucket' = '-2'\n"
+                        + ")");
+        bEnv.executeSql("CREATE TABLE SRC (i INT, pt INT, `proctime` AS 
PROCTIME())");
+
+        TableEnvironment sEnv =
+                tableEnvironmentBuilder()
+                        .streamingMode()
+                        .parallelism(1)
+                        .checkpointIntervalMs(200)
+                        .build();
+        sEnv.executeSql(createCatalogSql);
+        sEnv.executeSql("USE CATALOG mycat");
+        TableResult streamingSelect =
+                sEnv.executeSql(
+                        "SELECT i, D.pt, v FROM SRC LEFT JOIN T "
+                                + "FOR SYSTEM_TIME AS OF SRC.proctime AS D ON 
SRC.i = D.k AND SRC.pt = D.pt");
+
+        JobClient client = streamingSelect.getJobClient().get();
+        CloseableIterator<Row> it = streamingSelect.collect();
+
+        bEnv.executeSql("INSERT INTO T VALUES (1, 1, 10), (2, 2, 20), (3, 2, 
30)").await();
+        bEnv.executeSql("CALL sys.compact(`table` => 'default.T')").await();
+
+        // rescale for partitions to different num buckets and lookup join
+        bEnv.executeSql(
+                        "CALL sys.rescale(`table` => 'default.T', `bucket_num` 
=> 5, `partition` => 'pt=1')")
+                .await();
+        bEnv.executeSql(
+                        "CALL sys.rescale(`table` => 'default.T', `bucket_num` 
=> 8, `partition` => 'pt=2')")
+                .await();
+        bEnv.executeSql("INSERT INTO SRC VALUES (1, 1), (2, 2), (3, 
2)").await();
+        assertThat(collect(client, it, 3))
+                .containsExactlyInAnyOrder("+I[1, 1, 10]", "+I[2, 2, 20]", 
"+I[3, 2, 30]");
+
+        it.close();
+    }
+
     private List<String> collect(TableResult result) throws Exception {
         List<String> ret = new ArrayList<>();
         try (CloseableIterator<Row> it = result.collect()) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
index dbf9f99a32..dcbc405d31 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
@@ -37,6 +37,7 @@ import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.sink.CommitMessage;
 import org.apache.paimon.table.sink.StreamTableWrite;
 import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
 import org.apache.paimon.types.RowType;
@@ -49,6 +50,7 @@ import org.junit.jupiter.api.io.TempDir;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
+import java.lang.reflect.Field;
 import java.net.InetSocketAddress;
 import java.nio.file.Path;
 import java.time.Duration;
@@ -62,6 +64,7 @@ import java.util.List;
 import java.util.Random;
 import java.util.UUID;
 
+import static org.apache.paimon.data.BinaryRow.EMPTY_ROW;
 import static 
org.apache.paimon.flink.FlinkConnectorOptions.LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST;
 import static org.apache.paimon.service.ServiceManager.PRIMARY_KEY_LOOKUP;
 import static 
org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
@@ -148,6 +151,26 @@ public class FileStoreLookupFunctionTest {
         }
     }
 
+    @Test
+    public void testCompatibilityForOldVersion() throws Exception {
+        createLookupFunction(false, true, false, false);
+        commit(writeCommit(1));
+        PrimaryKeyPartialLookupTable lookupTable =
+                (PrimaryKeyPartialLookupTable) lookupFunction.lookupTable();
+        LocalQueryExecutor queryExecutor = (LocalQueryExecutor) 
lookupTable.queryExecutor();
+
+        // set totalBuckets to null, for testing old version
+        DataSplit split = (DataSplit) 
table.newReadBuilder().newScan().plan().splits().get(0);
+        Field field = DataSplit.class.getDeclaredField("totalBuckets");
+        field.setAccessible(true);
+        field.set(split, null);
+        assertThat(split.totalBuckets()).isNull();
+
+        // assert num buckets should be 2
+        queryExecutor.refreshSplit(split);
+        assertThat(queryExecutor.numBuckets(EMPTY_ROW)).isEqualTo(2);
+    }
+
     @ParameterizedTest
     @ValueSource(booleans = {false, true})
     public void testDefaultLocalPartial(boolean refreshAsync) throws Exception 
{

Reply via email to