This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new df3033c30c [flink] Support custom lookup shuffle for fixed bucket 
table (#5478)
df3033c30c is described below

commit df3033c30cbe5de8079b9aba910e0b5f2d0059ea
Author: yunfengzhou-hub <[email protected]>
AuthorDate: Fri Apr 18 11:21:14 2025 +0800

    [flink] Support custom lookup shuffle for fixed bucket table (#5478)
---
 docs/content/flink/sql-lookup.md                   |  17 ++
 .../paimon/flink/utils/RuntimeContextUtils.java    |  32 ---
 .../paimon/flink/utils/RuntimeContextUtils.java    |  32 ---
 .../paimon/flink/utils/RuntimeContextUtils.java    |  32 ---
 .../flink/lookup/FileStoreLookupFunction.java      |  26 +-
 .../lookup/partitioner/BucketIdExtractor.java      | 100 +++++++
 .../partitioner/BucketShufflePartitioner.java      |  46 +++
 .../lookup/partitioner/BucketShuffleStrategy.java  | 151 ++++++++++
 .../flink/lookup/partitioner/ShuffleStrategy.java  |  48 ++++
 .../paimon/flink/source/BaseDataTableSource.java   |  57 +++-
 .../paimon/flink/lookup/BucketIdExtractorTest.java | 257 +++++++++++++++++
 .../flink/lookup/BucketShufflePartitionerTest.java | 215 ++++++++++++++
 .../flink/lookup/BucketShuffleStrategyTest.java    | 103 +++++++
 .../flink/lookup/FileStoreLookupFunctionTest.java  |   6 +-
 .../lookup/LookupJoinBucketShuffleITCase.java      | 312 +++++++++++++++++++++
 paimon-flink/paimon-flink1-common/pom.xml          |   6 +
 .../abilities/SupportsLookupCustomShuffle.java     |  81 ++++++
 .../paimon/flink/utils/RuntimeContextUtils.java    |  16 ++
 paimon-flink/paimon-flink2-common/pom.xml          |   6 +
 .../paimon/flink/utils/RuntimeContextUtils.java    |  16 ++
 20 files changed, 1456 insertions(+), 103 deletions(-)

diff --git a/docs/content/flink/sql-lookup.md b/docs/content/flink/sql-lookup.md
index d28b7333ce..3b160dbfc9 100644
--- a/docs/content/flink/sql-lookup.md
+++ b/docs/content/flink/sql-lookup.md
@@ -120,6 +120,23 @@ your streaming job may be blocked. You can try to use 
`audit_log` system table f
 (convert CDC stream to append stream).
 {{< /hint >}}
 
+## Large Scale Lookup (Fixed Bucket)
+
+By default, each Flink subtask would store a whole copy of the lookup table. 
If the amount of data in `customers` 
+(lookup table) is too large for a single subtask, you can enable the shuffle 
lookup optimization as follows 
+(For Flink 2.0+ and fixed-bucket Paimon table). This optimization enables 
sending data of the same bucket to designated 
+subtask(s), so each Flink subtask would only need to store a part of the whole 
data.
+
+```sql
+-- enrich each order with customer information
+SELECT /*+ LOOKUP('table'='c', 'shuffle'='true') */
+o.order_id, o.total, c.country, c.zip
+FROM orders AS o
+JOIN customers
+FOR SYSTEM_TIME AS OF o.proc_time AS c
+ON o.customer_id = c.id;
+```
+
 ## Dynamic Partition
 
 In traditional data warehouses, each partition often maintains the latest full 
data, so this partition table only 
diff --git 
a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
 
b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
deleted file mode 100644
index 460fea55ad..0000000000
--- 
a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.utils;
-
-import org.apache.flink.api.common.functions.RuntimeContext;
-
-/** Utility methods about Flink runtime context to resolve compatibility 
issues. */
-public class RuntimeContextUtils {
-    public static int getNumberOfParallelSubtasks(RuntimeContext context) {
-        return context.getNumberOfParallelSubtasks();
-    }
-
-    public static int getIndexOfThisSubtask(RuntimeContext context) {
-        return context.getIndexOfThisSubtask();
-    }
-}
diff --git 
a/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
 
b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
deleted file mode 100644
index 460fea55ad..0000000000
--- 
a/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.utils;
-
-import org.apache.flink.api.common.functions.RuntimeContext;
-
-/** Utility methods about Flink runtime context to resolve compatibility 
issues. */
-public class RuntimeContextUtils {
-    public static int getNumberOfParallelSubtasks(RuntimeContext context) {
-        return context.getNumberOfParallelSubtasks();
-    }
-
-    public static int getIndexOfThisSubtask(RuntimeContext context) {
-        return context.getIndexOfThisSubtask();
-    }
-}
diff --git 
a/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
 
b/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
deleted file mode 100644
index 460fea55ad..0000000000
--- 
a/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.utils;
-
-import org.apache.flink.api.common.functions.RuntimeContext;
-
-/** Utility methods about Flink runtime context to resolve compatibility 
issues. */
-public class RuntimeContextUtils {
-    public static int getNumberOfParallelSubtasks(RuntimeContext context) {
-        return context.getNumberOfParallelSubtasks();
-    }
-
-    public static int getIndexOfThisSubtask(RuntimeContext context) {
-        return context.getIndexOfThisSubtask();
-    }
-}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index aff57c6c36..fd8b629507 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -25,6 +25,8 @@ import org.apache.paimon.data.JoinedRow;
 import org.apache.paimon.flink.FlinkConnectorOptions.LookupCacheMode;
 import org.apache.paimon.flink.FlinkRowData;
 import org.apache.paimon.flink.FlinkRowWrapper;
+import org.apache.paimon.flink.lookup.partitioner.ShuffleStrategy;
+import org.apache.paimon.flink.utils.RuntimeContextUtils;
 import org.apache.paimon.flink.utils.TableScanUtils;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
@@ -33,6 +35,7 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.source.OutOfRangeException;
 import org.apache.paimon.utils.FileIOUtils;
 import org.apache.paimon.utils.Filter;
+import org.apache.paimon.utils.Preconditions;
 
 import org.apache.paimon.shade.guava30.com.google.common.primitives.Ints;
 
@@ -84,6 +87,7 @@ public class FileStoreLookupFunction implements Serializable, 
Closeable {
     private final List<String> joinKeys;
     @Nullable private final Predicate predicate;
     @Nullable private final RefreshBlacklist refreshBlacklist;
+    @Nullable private final ShuffleStrategy strategy;
 
     private final List<InternalRow.FieldGetter> projectFieldsGetters;
 
@@ -103,7 +107,8 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
             FileStoreTable table,
             int[] projection,
             int[] joinKeyIndex,
-            @Nullable Predicate predicate) {
+            @Nullable Predicate predicate,
+            @Nullable ShuffleStrategy strategy) {
         if (!TableScanUtils.supportCompactDiffStreamingReading(table)) {
             TableScanUtils.streamingReadingValidate(table);
         }
@@ -143,6 +148,8 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
         this.refreshBlacklist =
                 RefreshBlacklist.create(
                         
table.options().get(LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST.key()));
+
+        this.strategy = strategy;
     }
 
     public void open(FunctionContext context) throws Exception {
@@ -404,8 +411,21 @@ public class FileStoreLookupFunction implements 
Serializable, Closeable {
      * @return the set of bucket IDs to be cached
      */
     protected Set<Integer> getRequireCachedBucketIds() {
-        // TODO: Implement the method when Flink support bucket shuffle for 
lookup join.
-        return null;
+        if (strategy == null) {
+            return null;
+        }
+        @Nullable
+        Integer indexOfThisSubtask = 
RuntimeContextUtils.getIndexOfThisSubtask(functionContext);
+        @Nullable
+        Integer numberOfParallelSubtasks =
+                
RuntimeContextUtils.getNumberOfParallelSubtasks(functionContext);
+        if (indexOfThisSubtask == null) {
+            Preconditions.checkState(numberOfParallelSubtasks == null);
+            return null;
+        } else {
+            Preconditions.checkState(numberOfParallelSubtasks != null);
+        }
+        return strategy.getRequiredCacheBucketIds(indexOfThisSubtask, 
numberOfParallelSubtasks);
     }
 
     protected void setCacheRowFilter(@Nullable Filter<InternalRow> 
cacheRowFilter) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/partitioner/BucketIdExtractor.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/partitioner/BucketIdExtractor.java
new file mode 100644
index 0000000000..ad0bb1f55d
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/partitioner/BucketIdExtractor.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup.partitioner;
+
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.Projection;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.flink.FlinkRowWrapper;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.KeyAndBucketExtractor;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+
+import org.apache.flink.table.data.RowData;
+
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.utils.Preconditions.checkState;
+
+/**
+ * {@link BucketIdExtractor} is used to extract the bucket id from join keys. 
It requires that the
+ * join keys contain all bucket keys and projects the bucket key fields from 
the given join key row.
+ */
+public class BucketIdExtractor implements Serializable {
+
+    private final int numBuckets;
+
+    private final TableSchema tableSchema;
+
+    private final List<String> joinKeyFieldNames;
+
+    private final List<String> bucketKeyFieldNames;
+
+    private Projection bucketKeyProjection;
+
+    public BucketIdExtractor(
+            int numBuckets,
+            TableSchema tableSchema,
+            List<String> joinKeyFieldNames,
+            List<String> bucketKeyFieldNames) {
+        checkState(
+                new 
HashSet<>(joinKeyFieldNames).containsAll(bucketKeyFieldNames),
+                "The join keys must contain all bucket keys.");
+        checkState(numBuckets > 0, "Number of buckets should be positive.");
+        this.numBuckets = numBuckets;
+        this.joinKeyFieldNames = joinKeyFieldNames;
+        this.bucketKeyFieldNames = bucketKeyFieldNames;
+        this.tableSchema = tableSchema;
+    }
+
+    public int extractBucketId(RowData joinKeyRow) {
+        checkState(joinKeyRow.getArity() == joinKeyFieldNames.size());
+        if (bucketKeyProjection == null) {
+            bucketKeyProjection = generateBucketKeyProjection();
+        }
+        FlinkRowWrapper internalRow = new FlinkRowWrapper(joinKeyRow);
+        BinaryRow bucketKey = bucketKeyProjection.apply(internalRow);
+        int bucket =
+                KeyAndBucketExtractor.bucket(
+                        KeyAndBucketExtractor.bucketKeyHashCode(bucketKey), 
numBuckets);
+        checkState(bucket < numBuckets);
+        return bucket;
+    }
+
+    private Projection generateBucketKeyProjection() {
+        int[] bucketKeyIndexes =
+                
bucketKeyFieldNames.stream().mapToInt(joinKeyFieldNames::indexOf).toArray();
+        List<DataField> joinKeyDataFields =
+                joinKeyFieldNames.stream()
+                        .map(
+                                joinKeyFieldName ->
+                                        tableSchema
+                                                .fields()
+                                                .get(
+                                                        tableSchema
+                                                                .fieldNames()
+                                                                
.indexOf(joinKeyFieldName)))
+                        .collect(Collectors.toList());
+        return CodeGenUtils.newProjection(new RowType(joinKeyDataFields), 
bucketKeyIndexes);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/partitioner/BucketShufflePartitioner.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/partitioner/BucketShufflePartitioner.java
new file mode 100644
index 0000000000..20ef5e8c5a
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/partitioner/BucketShufflePartitioner.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup.partitioner;
+
+import 
org.apache.flink.table.connector.source.abilities.SupportsLookupCustomShuffle.InputDataPartitioner;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.MathUtils;
+
+/**
+ * {@link BucketShufflePartitioner} class partitions rows based on the bucket 
id. It uses a custom
+ * strategy and an extractor to determine the target partition for a given set 
of join keys.
+ */
+public class BucketShufflePartitioner implements InputDataPartitioner {
+
+    private final ShuffleStrategy strategy;
+
+    private final BucketIdExtractor extractor;
+
+    public BucketShufflePartitioner(ShuffleStrategy strategy, 
BucketIdExtractor extractor) {
+        this.strategy = strategy;
+        this.extractor = extractor;
+    }
+
+    @Override
+    public int partition(RowData joinKeys, int numPartitions) {
+        int bucketId = extractor.extractBucketId(joinKeys);
+        int joinKeyHash = MathUtils.murmurHash(joinKeys.hashCode());
+        return strategy.getTargetSubtaskId(bucketId, joinKeyHash, 
numPartitions);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/partitioner/BucketShuffleStrategy.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/partitioner/BucketShuffleStrategy.java
new file mode 100644
index 0000000000..4bf02a19e9
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/partitioner/BucketShuffleStrategy.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup.partitioner;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.paimon.utils.Preconditions.checkState;
+
+/**
+ * {@link BucketShuffleStrategy} defines a strategy for determining the target 
subtask id for a
+ * given bucket id and join key hash. It also provides a method to retrieve 
the set of bucket ids
+ * that are required to be cached by a specific subtask.
+ */
+public class BucketShuffleStrategy implements ShuffleStrategy {
+
+    private final int numBuckets;
+
+    public BucketShuffleStrategy(int numBuckets) {
+        checkState(numBuckets > 0, "Number of buckets should be positive.");
+        this.numBuckets = numBuckets;
+    }
+
+    /**
+     * Determines the target subtask ID for a given bucket ID and join key 
hash.
+     *
+     * <p>The method uses two different strategies based on the comparison 
between the number of
+     * buckets and the number of subtasks:
+     *
+     * <p>1. If the number of buckets is greater than or equal to the number 
of subtasks (numBuckets
+     * >= numSubtasks), then the target subtask ID is determined by assigning 
buckets to subtasks in
+     * a round-robin manner:
+     *
+     * {@code
+     * e.g., numBuckets = 5, numSubtasks = 3
+     * Bucket 0 -> Subtask 0
+     * Bucket 1 -> Subtask 1
+     * Bucket 2 -> Subtask 2
+     * Bucket 3 -> Subtask 0
+     * Bucket 4 -> Subtask 1
+     *
+     * }
+     *
+     * <p>2. If the number of buckets is less than the number of subtasks 
(numBuckets <
+     * numSubtasks), then the target subtask ID is determined by distributing 
subtasks to buckets in
+     * a round-robin manner:
+     *
+     * {@code
+     * e.g., numBuckets = 2, numSubtasks = 5
+     * Bucket 0 -> Subtask 0,2,4
+     * Bucket 1 -> Subtask 1,3
+     *
+     * }
+     *
+     * @param bucketId The ID of the bucket.
+     * @param joinKeyHash The hash of the join key.
+     * @param numSubtasks The total number of target subtasks.
+     * @return The target subtask ID for the given bucket ID and join key hash.
+     */
+    public int getTargetSubtaskId(int bucketId, int joinKeyHash, int 
numSubtasks) {
+        if (numBuckets >= numSubtasks) {
+            return bucketId % numSubtasks;
+        } else {
+            // Calculate the number of ranges to group the subtasks
+            int bucketNumMultipleRange = numSubtasks / numBuckets;
+            int adjustedBucketNumMultipleRange;
+            // If the start index of the current bucket plus the total range 
exceeds
+            // the number of subtasks, use the calculated range without 
adjustment.
+            // Example: bucketId = 1, numSubtasks = 4, numBuckets = 2
+            if ((bucketId + bucketNumMultipleRange * numBuckets) >= 
numSubtasks) {
+                adjustedBucketNumMultipleRange = bucketNumMultipleRange;
+            } else {
+                // Otherwise, increment the range by one to ensure all 
subtasks are included.
+                // Example: bucketId = 0, numSubtasks = 5, numBuckets = 2
+                adjustedBucketNumMultipleRange = bucketNumMultipleRange + 1;
+            }
+            return (joinKeyHash % adjustedBucketNumMultipleRange) * numBuckets 
+ bucketId;
+        }
+    }
+
+    /**
+     * Retrieves the set of bucket IDs that are required to be cached by a 
specific subtask. This
+     * method uses the same strategy as {@link #getTargetSubtaskId(int, int, 
int)} to ensure
+     * consistency in bucket to subtask mapping.
+     *
+     * <p>The method uses two different strategies based on the comparison 
between the number of
+     * buckets and the number of subtasks:
+     *
+     * <p>1. If the number of buckets is greater than or equal to the number 
of subtasks (numBuckets
+     * >= numSubtasks), then each subtask caches buckets in a round-robin 
manner.
+     *
+     * {@code
+     * e.g., numBuckets = 5, numSubtasks = 3
+     * Subtask 0 -> Bucket 0,3
+     * Subtask 1 -> Bucket 1,4
+     * Subtask 2 -> Bucket 2
+     *
+     * }
+     *
+     * <p>2. If the number of buckets is less than the number of subtasks 
(numBuckets <
+     * numSubtasks), then each subtask caches buckets by polling from buckets 
in a round-robin
+     * manner:
+     *
+     * {@code
+     * e.g., numBuckets = 2, numSubtasks = 5
+     * Subtask 0 -> Bucket 0
+     * Subtask 1 -> Bucket 1
+     * Subtask 2 -> Bucket 0
+     * Subtask 3 -> Bucket 1
+     * Subtask 4 -> Bucket 0
+     *
+     * }
+     *
+     * @param subtaskId The ID of the subtask.
+     * @param numSubtasks The total number of subtasks.
+     * @return The set of bucket IDs that are required to be cached by the 
specified subtask.
+     */
+    public Set<Integer> getRequiredCacheBucketIds(int subtaskId, int 
numSubtasks) {
+        Set<Integer> requiredCacheBucketIds = new HashSet<>();
+        if (numBuckets >= numSubtasks) {
+            for (int bucketId = 0; bucketId < numBuckets; bucketId++) {
+                if (bucketId % numSubtasks == subtaskId) {
+                    requiredCacheBucketIds.add(bucketId);
+                }
+            }
+        } else {
+            for (int bucketId = 0; bucketId < numBuckets; bucketId++) {
+                if (bucketId == subtaskId % numBuckets) {
+                    requiredCacheBucketIds.add(bucketId);
+                }
+            }
+        }
+        return requiredCacheBucketIds;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/partitioner/ShuffleStrategy.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/partitioner/ShuffleStrategy.java
new file mode 100644
index 0000000000..2917777afe
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/partitioner/ShuffleStrategy.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup.partitioner;
+
+import java.io.Serializable;
+import java.util.Set;
+
+/**
+ * Shuffle strategy use to assign bucket to subtask and get all the required 
buckets for a given
+ * task.
+ */
+public interface ShuffleStrategy extends Serializable {
+
+    /**
+     * Calculates and returns the subtask ID that a given bucket should be 
assigned to.
+     *
+     * @param bucketId The ID of the bucket.
+     * @param joinKeyHash The hash value of the join key.
+     * @param numSubtasks The total number of subtasks.
+     * @return The subtask ID that the bucket should be assigned to.
+     */
+    int getTargetSubtaskId(int bucketId, int joinKeyHash, int numSubtasks);
+
+    /**
+     * Returns all the bucket IDs required for a given subtask.
+     *
+     * @param subtaskId The ID of the subtask.
+     * @param numSubtasks The total number of subtasks.
+     * @return A set containing all the bucket IDs required for the subtask.
+     */
+    Set<Integer> getRequiredCacheBucketIds(int subtaskId, int numSubtasks);
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
index 77de76d867..81cced9a74 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
@@ -28,10 +28,16 @@ import org.apache.paimon.flink.log.LogSourceProvider;
 import org.apache.paimon.flink.log.LogStoreTableFactory;
 import org.apache.paimon.flink.lookup.FileStoreLookupFunction;
 import org.apache.paimon.flink.lookup.LookupRuntimeProviderFactory;
+import org.apache.paimon.flink.lookup.partitioner.BucketIdExtractor;
+import org.apache.paimon.flink.lookup.partitioner.BucketShufflePartitioner;
+import org.apache.paimon.flink.lookup.partitioner.BucketShuffleStrategy;
+import org.apache.paimon.flink.lookup.partitioner.ShuffleStrategy;
+import org.apache.paimon.flink.utils.RuntimeContextUtils;
 import org.apache.paimon.options.ConfigOption;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.DataTable;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
@@ -46,20 +52,25 @@ import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.source.LookupTableSource;
 import org.apache.flink.table.connector.source.SourceProvider;
 import 
org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown;
+import 
org.apache.flink.table.connector.source.abilities.SupportsLookupCustomShuffle;
 import 
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.expressions.AggregateExpression;
 import org.apache.flink.table.factories.DynamicTableFactory;
 import org.apache.flink.table.types.DataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
 import java.time.Duration;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
@@ -82,7 +93,11 @@ import static 
org.apache.paimon.utils.Preconditions.checkNotNull;
  * batch mode or streaming mode.
  */
 public abstract class BaseDataTableSource extends FlinkTableSource
-        implements LookupTableSource, SupportsWatermarkPushDown, 
SupportsAggregatePushDown {
+        implements LookupTableSource,
+                SupportsWatermarkPushDown,
+                SupportsAggregatePushDown,
+                SupportsLookupCustomShuffle {
+    private static final Logger LOG = 
LoggerFactory.getLogger(BaseDataTableSource.class);
 
     private static final List<ConfigOption<?>> TIME_TRAVEL_OPTIONS =
             Arrays.asList(
@@ -98,7 +113,7 @@ public abstract class BaseDataTableSource extends 
FlinkTableSource
     protected final boolean unbounded;
     protected final DynamicTableFactory.Context context;
     @Nullable protected final LogStoreTableFactory logStoreTableFactory;
-
+    @Nullable private BucketShufflePartitioner bucketShufflePartitioner;
     @Nullable protected WatermarkStrategy<RowData> watermarkStrategy;
     @Nullable protected Long countPushed;
 
@@ -276,7 +291,32 @@ public abstract class BaseDataTableSource extends 
FlinkTableSource
 
     protected FileStoreLookupFunction getFileStoreLookupFunction(
             LookupContext context, FileStoreTable table, int[] projection, 
int[] joinKey) {
-        return new FileStoreLookupFunction(table, projection, joinKey, 
predicate);
+        // 1.Get the join key field names from join key indexes.
+        List<String> joinKeyFieldNames =
+                Arrays.stream(joinKey)
+                        .mapToObj(i -> 
table.rowType().getFieldNames().get(projection[i]))
+                        .collect(Collectors.toList());
+        // 2.Get the bucket key field names from bucket key indexes.
+        List<String> bucketKeyFieldNames = table.schema().bucketKeys();
+        boolean useCustomShuffle =
+                supportBucketShufflePartitioner(joinKeyFieldNames, 
bucketKeyFieldNames)
+                        && RuntimeContextUtils.preferCustomShuffle(context);
+        int numBuckets;
+        ShuffleStrategy strategy = null;
+        if (useCustomShuffle) {
+            numBuckets = table.store().options().bucket();
+            BucketIdExtractor extractor =
+                    new BucketIdExtractor(
+                            numBuckets, table.schema(), joinKeyFieldNames, 
bucketKeyFieldNames);
+
+            strategy = new BucketShuffleStrategy(numBuckets);
+            bucketShufflePartitioner = new BucketShufflePartitioner(strategy, 
extractor);
+        }
+
+        if (strategy != null) {
+            LOG.info("Paimon connector is using bucket shuffle partitioning 
strategy.");
+        }
+        return new FileStoreLookupFunction(table, projection, joinKey, 
predicate, strategy);
     }
 
     private FileStoreTable timeTravelDisabledTable(FileStoreTable table) {
@@ -361,4 +401,15 @@ public abstract class BaseDataTableSource extends 
FlinkTableSource
     public boolean isUnbounded() {
         return unbounded;
     }
+
+    @Override
+    public Optional<InputDataPartitioner> getPartitioner() {
+        return Optional.ofNullable(bucketShufflePartitioner);
+    }
+
+    private boolean supportBucketShufflePartitioner(
+            List<String> joinKeyFieldNames, List<String> bucketKeyFieldNames) {
+        return BucketMode.HASH_FIXED.equals(((FileStoreTable) 
table).bucketMode())
+                && new 
HashSet<>(joinKeyFieldNames).containsAll(bucketKeyFieldNames);
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/BucketIdExtractorTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/BucketIdExtractorTest.java
new file mode 100644
index 0000000000..c5c2759c26
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/BucketIdExtractorTest.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.CatalogITCaseBase;
+import org.apache.paimon.flink.lookup.partitioner.BucketIdExtractor;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.DataSplit;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.function.Function;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** The test for {@link BucketIdExtractor}. */
+public class BucketIdExtractorTest extends CatalogITCaseBase {
+
+    private static final int BUCKET_NUMBER = 5;
+
+    private static final int COL_NUMBER = 5;
+
+    private static final int ROW_NUMBER = 100;
+
+    @Test
+    public void testJoinKeyEqualToSingleBucketKey() throws Exception {
+        Random seed = new Random();
+        int bucketId = seed.nextInt(COL_NUMBER);
+        int bucketKeyIndex = seed.nextInt(5);
+        String bucketKeyName = "col" + (bucketKeyIndex + 1);
+        FileStoreTable table = createTestTable(bucketKeyName);
+        List<List<Object>> bucketKeyRows =
+                getGroundTruthBucketKeyRows(
+                        table,
+                        bucketId,
+                        
createBucketKeyGetter(Collections.singletonList(bucketKeyIndex)));
+        List<String> joinKeyNames = Collections.singletonList(bucketKeyName);
+        List<RowData> joinKeyRows =
+                generateJoinKeyRows(
+                        bucketKeyRows, bucketKeyRow -> 
GenericRowData.of(bucketKeyRow.get(0)));
+        BucketIdExtractor bucketIdExtractor =
+                new BucketIdExtractor(BUCKET_NUMBER, table.schema(), 
joinKeyNames, joinKeyNames);
+        checkCorrectness(bucketIdExtractor, bucketId, joinKeyRows);
+    }
+
+    @Test
+    public void testJoinKeysContainSingleKey() throws Exception {
+        Random seed = new Random();
+        List<List<Integer>> joinKeyIndexes = getColumnIndexCombinations();
+        int bucketId = seed.nextInt(COL_NUMBER);
+        List<Integer> joinKeyIndex = 
joinKeyIndexes.get(seed.nextInt(joinKeyIndexes.size()));
+        int bucketKeyIndex = joinKeyIndex.get(1);
+        String bucketKeyName = "col" + bucketKeyIndex;
+        FileStoreTable table = createTestTable(bucketKeyName);
+        List<List<Object>> bucketKeyRows =
+                getGroundTruthBucketKeyRows(
+                        table,
+                        bucketId,
+                        
createBucketKeyGetter(Collections.singletonList(bucketKeyIndex - 1)));
+        List<String> bucketKeyNames = Collections.singletonList(bucketKeyName);
+        List<String> joinKeyNames = Arrays.asList("col" + joinKeyIndex.get(0), 
bucketKeyName);
+        List<RowData> joinKeyRows =
+                generateJoinKeyRows(
+                        bucketKeyRows,
+                        bucketKeyRow ->
+                                GenericRowData.of(
+                                        
generateFakeColumnValue(joinKeyIndex.get(0)),
+                                        bucketKeyRow.get(0)));
+        BucketIdExtractor bucketIdExtractor =
+                new BucketIdExtractor(BUCKET_NUMBER, table.schema(), 
joinKeyNames, bucketKeyNames);
+        checkCorrectness(bucketIdExtractor, bucketId, joinKeyRows);
+    }
+
+    @Test
+    public void testJoinKeysEqualToMultiBucketKeys() throws Exception {
+        Random seed = new Random();
+        List<List<Integer>> bucketKeyIndexes = getColumnIndexCombinations();
+        int bucketId = seed.nextInt(COL_NUMBER);
+        List<Integer> bucketKeyIndex = 
bucketKeyIndexes.get(seed.nextInt(bucketKeyIndexes.size()));
+        String bucketKeyName = "col" + bucketKeyIndex.get(0) + ",col" + 
bucketKeyIndex.get(1);
+        FileStoreTable table = createTestTable(bucketKeyName);
+        List<List<Object>> bucketKeyRows =
+                getGroundTruthBucketKeyRows(
+                        table,
+                        bucketId,
+                        createBucketKeyGetter(
+                                Arrays.asList(
+                                        bucketKeyIndex.get(0) - 1, 
bucketKeyIndex.get(1) - 1)));
+        List<String> joinKeyNames = Arrays.asList(bucketKeyName.split(","));
+        List<RowData> joinKeyRows =
+                generateJoinKeyRows(
+                        bucketKeyRows,
+                        bucketKeyRow ->
+                                GenericRowData.of(bucketKeyRow.get(0), 
bucketKeyRow.get(1)));
+        BucketIdExtractor bucketIdExtractor =
+                new BucketIdExtractor(BUCKET_NUMBER, table.schema(), 
joinKeyNames, joinKeyNames);
+        checkCorrectness(bucketIdExtractor, bucketId, joinKeyRows);
+    }
+
+    private List<RowData> generateJoinKeyRows(
+            List<List<Object>> bucketKeyRows, Function<List<Object>, RowData> 
converter) {
+        List<RowData> joinKeyRows = new ArrayList<>();
+        for (List<Object> bucketKeyRow : bucketKeyRows) {
+            joinKeyRows.add(converter.apply(bucketKeyRow));
+        }
+        return joinKeyRows;
+    }
+
+    private void checkCorrectness(
+            BucketIdExtractor extractor, int targetBucketId, List<RowData> 
joinKeyRows) {
+        for (RowData joinKeyRow : joinKeyRows) {
+            
assertThat(extractor.extractBucketId(joinKeyRow)).isEqualTo(targetBucketId);
+        }
+    }
+
+    private List<List<Object>> getGroundTruthBucketKeyRows(
+            FileStoreTable table, int bucketId, Function<InternalRow, 
List<Object>> bucketKeyGetter)
+            throws IOException {
+        List<ManifestEntry> files = 
table.store().newScan().withBucket(bucketId).plan().files();
+        List<List<Object>> bucketKeyRows = new ArrayList<>();
+        for (ManifestEntry file : files) {
+            DataSplit dataSplit =
+                    DataSplit.builder()
+                            .withPartition(file.partition())
+                            .withBucket(file.bucket())
+                            
.withDataFiles(Collections.singletonList(file.file()))
+                            .withBucketPath("not used")
+                            .build();
+            table.newReadBuilder()
+                    .newRead()
+                    .createReader(dataSplit)
+                    .forEachRemaining(
+                            internalRow -> {
+                                
bucketKeyRows.add(bucketKeyGetter.apply(internalRow));
+                            });
+        }
+        return bucketKeyRows;
+    }
+
+    private FileStoreTable createTestTable(String bucketKey) throws Exception {
+        String tableName = "Test";
+        String ddl =
+                String.format(
+                        "CREATE TABLE %s (col1 INT, col2 STRING, col3 FLOAT, 
col4 INT, col5 BOOLEAN ) WITH"
+                                + " ('bucket'='%s'",
+                        tableName, BUCKET_NUMBER);
+        if (bucketKey != null) {
+            ddl += ", 'bucket-key' = '" + bucketKey + "')";
+        }
+        batchSql(ddl);
+        Random seed = new Random();
+        StringBuilder dml = new StringBuilder(String.format("INSERT INTO %s 
VALUES ", tableName));
+        for (int index = 1; index < ROW_NUMBER; ++index) {
+            dml.append(
+                    String.format(
+                            "(%s, '%s', %s, %s, %s), ",
+                            seed.nextInt(ROW_NUMBER),
+                            seed.nextInt(ROW_NUMBER),
+                            101.1F + seed.nextInt(50),
+                            seed.nextInt(ROW_NUMBER),
+                            (seed.nextBoolean() ? "true" : "false")));
+        }
+        dml.append(
+                String.format(
+                        "(%s, '%s', %s, %s, %s)",
+                        seed.nextInt(ROW_NUMBER),
+                        seed.nextInt(ROW_NUMBER),
+                        101.1F + seed.nextInt(50),
+                        seed.nextInt(ROW_NUMBER),
+                        (seed.nextBoolean() ? "true" : "false")));
+        batchSql(dml.toString());
+        return paimonTable(tableName);
+    }
+
+    private Function<InternalRow, List<Object>> createBucketKeyGetter(
+            List<Integer> bucketKeyIndexes) {
+        return row -> {
+            List<Object> bucketKeys = new ArrayList<>();
+            for (Integer bucketKeyIndex : bucketKeyIndexes) {
+                switch (bucketKeyIndex) {
+                    case 0:
+                        bucketKeys.add(row.getInt(0));
+                        break;
+                    case 1:
+                        
bucketKeys.add(StringData.fromString(row.getString(1).toString()));
+                        break;
+                    case 2:
+                        bucketKeys.add(row.getFloat(2));
+                        break;
+                    case 3:
+                        bucketKeys.add(row.getInt(3));
+                        break;
+                    case 4:
+                        bucketKeys.add(row.getBoolean(4));
+                        break;
+                    default:
+                        throw new UnsupportedOperationException();
+                }
+            }
+            return bucketKeys;
+        };
+    }
+
+    private List<List<Integer>> getColumnIndexCombinations() {
+        List<List<Integer>> bucketIndexes = new ArrayList<>();
+        for (int i = 1; i <= BUCKET_NUMBER; ++i) {
+            for (int j = i + 1; j <= BUCKET_NUMBER; ++j) {
+                bucketIndexes.add(Arrays.asList(i, j));
+            }
+        }
+        return bucketIndexes;
+    }
+
+    private Object generateFakeColumnValue(Integer columnIndex) {
+        switch (columnIndex) {
+            case 0:
+                return 1;
+            case 1:
+                return StringData.fromString("Test");
+            case 2:
+                return 1.21F;
+            case 3:
+                return 10;
+            case 4:
+                return false;
+            default:
+                throw new UnsupportedOperationException();
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/BucketShufflePartitionerTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/BucketShufflePartitionerTest.java
new file mode 100644
index 0000000000..5b26aca25f
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/BucketShufflePartitionerTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup;
+
+import org.apache.paimon.flink.CatalogITCaseBase;
+import org.apache.paimon.flink.lookup.partitioner.BucketIdExtractor;
+import org.apache.paimon.flink.lookup.partitioner.BucketShufflePartitioner;
+import org.apache.paimon.flink.lookup.partitioner.BucketShuffleStrategy;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.DataSplit;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** The test for {@link BucketShufflePartitioner}. */
+public class BucketShufflePartitionerTest extends CatalogITCaseBase {
+
+    @Test
+    public void testBucketNumLessThanLookupJoinParallelism() throws Exception {
+        int numBuckets = 100;
+        int lookupJoinParallelism = 27;
+        FileStoreTable table = createTestTable(numBuckets);
+        BucketShuffleStrategy strategy = new BucketShuffleStrategy(numBuckets);
+        BucketShufflePartitioner bucketShufflePartitioner =
+                createBucketShufflePartitioner(table, numBuckets, strategy);
+        List<Tuple2<RowData, Integer>> joinKeysList =
+                getGroundTruthJoinKeysWithBucketId(table, numBuckets);
+        Map<Integer, List<Tuple2<RowData, Integer>>> partitionResult = new 
HashMap<>();
+        for (Tuple2<RowData, Integer> joinKeysWithBucketId : joinKeysList) {
+            int subtaskId =
+                    bucketShufflePartitioner.partition(
+                            joinKeysWithBucketId.f0, lookupJoinParallelism);
+            partitionResult.compute(
+                    subtaskId,
+                    (key, currentValue) -> {
+                        List<Tuple2<RowData, Integer>> newValue =
+                                (currentValue != null) ? currentValue : new 
ArrayList<>();
+                        newValue.add(joinKeysWithBucketId);
+                        return newValue;
+                    });
+        }
+        for (int subtaskId = 0; subtaskId < lookupJoinParallelism; 
++subtaskId) {
+            List<Tuple2<RowData, Integer>> joinKeysWithBucketIdList =
+                    partitionResult.get(subtaskId);
+            Set<Integer> requiredCacheBucketIds =
+                    strategy.getRequiredCacheBucketIds(subtaskId, 
lookupJoinParallelism);
+            for (Tuple2<RowData, Integer> joinKeysWithBucketId : 
joinKeysWithBucketIdList) {
+                
assertThat(requiredCacheBucketIds).contains(joinKeysWithBucketId.f1);
+                assertThat(joinKeysWithBucketId.f1 % 
lookupJoinParallelism).isEqualTo(subtaskId);
+            }
+        }
+    }
+
+    @Test
+    public void testBucketNumEqualToLookupJoinParallelism() throws Exception {
+        int numBuckets = 78;
+        int lookupJoinParallelism = 78;
+        FileStoreTable table = createTestTable(numBuckets);
+        BucketShuffleStrategy strategy = new BucketShuffleStrategy(numBuckets);
+        BucketShufflePartitioner bucketShufflePartitioner =
+                createBucketShufflePartitioner(table, numBuckets, strategy);
+        List<Tuple2<RowData, Integer>> joinKeysList =
+                getGroundTruthJoinKeysWithBucketId(table, numBuckets);
+        Map<Integer, List<Tuple2<RowData, Integer>>> partitionResult = new 
HashMap<>();
+        for (Tuple2<RowData, Integer> joinKeysWithBucketId : joinKeysList) {
+            int subtaskId =
+                    bucketShufflePartitioner.partition(
+                            joinKeysWithBucketId.f0, lookupJoinParallelism);
+            partitionResult.compute(
+                    subtaskId,
+                    (key, currentValue) -> {
+                        List<Tuple2<RowData, Integer>> newValue =
+                                (currentValue != null) ? currentValue : new 
ArrayList<>();
+                        newValue.add(joinKeysWithBucketId);
+                        return newValue;
+                    });
+        }
+        for (int subtaskId = 0; subtaskId < lookupJoinParallelism; 
++subtaskId) {
+            List<Tuple2<RowData, Integer>> joinKeysWithBucketIdList =
+                    partitionResult.get(subtaskId);
+            Set<Integer> requiredCacheBucketIds =
+                    strategy.getRequiredCacheBucketIds(subtaskId, 
lookupJoinParallelism);
+            for (Tuple2<RowData, Integer> joinKeysWithBucketId : 
joinKeysWithBucketIdList) {
+                assertThat(requiredCacheBucketIds.size()).isOne();
+                
assertThat(requiredCacheBucketIds).contains(joinKeysWithBucketId.f1);
+                assertThat(joinKeysWithBucketId.f1 % 
lookupJoinParallelism).isEqualTo(subtaskId);
+            }
+        }
+    }
+
+    @Test
+    public void testBucketNumLargerThanLookupJoinParallelism() throws 
Exception {
+        int numBuckets = 4;
+        int lookupJoinParallelism = 15;
+        FileStoreTable table = createTestTable(numBuckets);
+        BucketShuffleStrategy strategy = new BucketShuffleStrategy(numBuckets);
+        BucketShufflePartitioner bucketShufflePartitioner =
+                createBucketShufflePartitioner(table, numBuckets, strategy);
+        List<Tuple2<RowData, Integer>> joinKeysList =
+                getGroundTruthJoinKeysWithBucketId(table, numBuckets);
+        Map<Integer, List<Tuple2<RowData, Integer>>> partitionResult = new 
HashMap<>();
+        for (Tuple2<RowData, Integer> joinKeysWithBucketId : joinKeysList) {
+            int subtaskId =
+                    bucketShufflePartitioner.partition(
+                            joinKeysWithBucketId.f0, lookupJoinParallelism);
+            partitionResult.compute(
+                    subtaskId,
+                    (key, currentValue) -> {
+                        List<Tuple2<RowData, Integer>> newValue =
+                                (currentValue != null) ? currentValue : new 
ArrayList<>();
+                        newValue.add(joinKeysWithBucketId);
+                        return newValue;
+                    });
+        }
+        for (int subtaskId = 0; subtaskId < lookupJoinParallelism; 
++subtaskId) {
+            List<Tuple2<RowData, Integer>> joinKeysWithBucketIdList =
+                    partitionResult.get(subtaskId);
+            Set<Integer> requiredCacheBucketIds =
+                    strategy.getRequiredCacheBucketIds(subtaskId, 
lookupJoinParallelism);
+            for (Tuple2<RowData, Integer> joinKeysWithBucketId : 
joinKeysWithBucketIdList) {
+                
assertThat(requiredCacheBucketIds).contains(joinKeysWithBucketId.f1);
+                assertThat(subtaskId % 
numBuckets).isEqualTo(joinKeysWithBucketId.f1);
+            }
+        }
+    }
+
+    private List<Tuple2<RowData, Integer>> getGroundTruthJoinKeysWithBucketId(
+            FileStoreTable table, int numBuckets) throws IOException {
+        List<Tuple2<RowData, Integer>> joinKeyRows = new ArrayList<>();
+        Random random = new Random();
+        for (int bucketId = 0; bucketId < numBuckets; ++bucketId) {
+            ManifestEntry file = 
table.store().newScan().withBucket(bucketId).plan().files().get(0);
+            DataSplit dataSplit =
+                    DataSplit.builder()
+                            .withPartition(file.partition())
+                            .withBucket(file.bucket())
+                            
.withDataFiles(Collections.singletonList(file.file()))
+                            .withBucketPath("not used")
+                            .build();
+            int bucket = bucketId;
+            table.newReadBuilder()
+                    .newRead()
+                    .createReader(dataSplit)
+                    .forEachRemaining(
+                            internalRow -> {
+                                joinKeyRows.add(
+                                        Tuple2.of(
+                                                GenericRowData.of(
+                                                        
String.valueOf(random.nextInt(numBuckets)),
+                                                        internalRow.getInt(1)),
+                                                bucket));
+                            });
+        }
+        return joinKeyRows;
+    }
+
+    private BucketShufflePartitioner createBucketShufflePartitioner(
+            FileStoreTable table, int numBuckets, BucketShuffleStrategy 
strategy) {
+        BucketIdExtractor bucketIdExtractor =
+                new BucketIdExtractor(
+                        numBuckets,
+                        table.schema(),
+                        Arrays.asList("col1", "col2"),
+                        Collections.singletonList("col2"));
+        return new BucketShufflePartitioner(strategy, bucketIdExtractor);
+    }
+
+    private FileStoreTable createTestTable(int bucketNum) throws Exception {
+        String tableName = "Test";
+        String ddl =
+                String.format(
+                        "CREATE TABLE %s (col1 STRING, col2 INT, col3 FLOAT) 
WITH"
+                                + " ('bucket'='%s', 'bucket-key' = 'col2')",
+                        tableName, bucketNum);
+        batchSql(ddl);
+        StringBuilder dml = new StringBuilder(String.format("INSERT INTO %s 
VALUES ", tableName));
+        for (int index = 1; index < 1000; ++index) {
+            dml.append(String.format("('%s', %s, %s), ", index, index, 
101.1F));
+        }
+        dml.append(String.format("('%s', %s, %s)", 1000, 1000, 101.1F));
+        batchSql(dml.toString());
+        return paimonTable(tableName);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/BucketShuffleStrategyTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/BucketShuffleStrategyTest.java
new file mode 100644
index 0000000000..efe9694359
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/BucketShuffleStrategyTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup;
+
+import org.apache.paimon.flink.lookup.partitioner.BucketShuffleStrategy;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.catchThrowable;
+
+/** The test for {@link BucketShuffleStrategy}. */
+public class BucketShuffleStrategyTest {
+
+    @Test
+    public void testConstructorWithInvalidBuckets() {
+        Throwable thrown = catchThrowable(() -> new BucketShuffleStrategy(0));
+        assertThat(thrown)
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessage("Number of buckets should be positive.");
+    }
+
+    @Test
+    public void testGetTargetSubtaskIdWithMoreBucketsThanSubtasks() {
+        BucketShuffleStrategy bucketShuffleStrategy = new 
BucketShuffleStrategy(10);
+        int targetSubtaskId = bucketShuffleStrategy.getTargetSubtaskId(5, 
12345, 4);
+        assertThat(targetSubtaskId).isEqualTo(1);
+    }
+
+    @Test
+    public void testGetTargetSubtaskIdWithLessBucketsThanSubtasks() {
+        BucketShuffleStrategy bucketShuffleStrategy1 = new 
BucketShuffleStrategy(3);
+        int targetSubtaskId1 = bucketShuffleStrategy1.getTargetSubtaskId(2, 
12345, 10);
+        int expectedTargetSubtaskId1 = ((12345 % 3) * 3 + 2);
+        assertThat(targetSubtaskId1).isEqualTo(expectedTargetSubtaskId1);
+        BucketShuffleStrategy bucketShuffleStrategy2 = new 
BucketShuffleStrategy(2);
+        int targetSubtaskId2 = bucketShuffleStrategy2.getTargetSubtaskId(0, 
54321, 7);
+        int expectedTargetSubtaskId2 = (54321 % 4) * 2;
+        assertThat(targetSubtaskId2).isEqualTo(expectedTargetSubtaskId2);
+    }
+
+    @Test
+    public void testGetRequiredCacheBucketIdsWithMoreBucketsThanSubtasks() {
+        BucketShuffleStrategy bucketShuffleStrategy = new 
BucketShuffleStrategy(10);
+        Set<Integer> bucketIds = 
bucketShuffleStrategy.getRequiredCacheBucketIds(1, 4);
+        // Subtask ID 1 should cache all buckets where bucketId % 4 == 1
+        assertThat(bucketIds).containsExactlyInAnyOrder(1, 5, 9);
+    }
+
+    @Test
+    public void testGetRequiredCacheBucketIdsWithLessBucketsThanSubtasks() {
+        BucketShuffleStrategy bucketShuffleStrategy = new 
BucketShuffleStrategy(3);
+        Set<Integer> bucketIds = 
bucketShuffleStrategy.getRequiredCacheBucketIds(1, 10);
+        // Subtask ID 1 should cache the bucket where bucketId == 1 % 3
+        assertThat(bucketIds).containsExactlyInAnyOrder(1);
+    }
+
+    @Test
+    public void testSymmetryOfGetTargetSubtaskIdAndGetRequiredCacheBucketIds() 
{
+        for (int numBuckets = 1; numBuckets < 100; ++numBuckets) {
+            for (int numSubtasks = 1; numSubtasks < 100; ++numSubtasks) {
+                testCorrectness(numBuckets, numSubtasks);
+            }
+        }
+    }
+
+    private void testCorrectness(int numBuckets, int numSubtasks) {
+        BucketShuffleStrategy bucketShuffleStrategy = new 
BucketShuffleStrategy(numBuckets);
+        Set<Integer> allTargetSubtaskIds = new HashSet<>();
+        for (int bucketId = 0; bucketId < numBuckets; bucketId++) {
+            for (int joinKeyHash = 0; joinKeyHash < numSubtasks; 
++joinKeyHash) {
+                int subtaskId =
+                        bucketShuffleStrategy.getTargetSubtaskId(
+                                bucketId, joinKeyHash, numSubtasks);
+                assertThat(subtaskId).isNotNegative().isLessThan(numSubtasks);
+                allTargetSubtaskIds.add(subtaskId);
+                Set<Integer> requiredCacheBucketIds =
+                        
bucketShuffleStrategy.getRequiredCacheBucketIds(subtaskId, numSubtasks);
+                assertThat(requiredCacheBucketIds).contains(bucketId);
+            }
+        }
+        assertThat(allTargetSubtaskIds).hasSize(numSubtasks);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
index af5dec0f87..dbf9f99a32 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
@@ -105,7 +105,11 @@ public class FileStoreLookupFunctionTest {
     private FileStoreLookupFunction createLookupFunction(
             FileStoreTable table, boolean joinEqualPk) {
         return new FileStoreLookupFunction(
-                table, new int[] {0, 1}, joinEqualPk ? new int[] {0, 1} : new 
int[] {1}, null);
+                table,
+                new int[] {0, 1},
+                joinEqualPk ? new int[] {0, 1} : new int[] {1},
+                null,
+                null);
     }
 
     private FileStoreTable createFileStoreTable(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupJoinBucketShuffleITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupJoinBucketShuffleITCase.java
new file mode 100644
index 0000000000..a4e485d829
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupJoinBucketShuffleITCase.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup;
+
+import org.apache.paimon.flink.CatalogITCaseBase;
+
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for custom lookup shuffle. */
+public class LookupJoinBucketShuffleITCase extends CatalogITCaseBase {
+
+    private static final int BUCKET_NUMBER = 5;
+
+    private static final int ROW_NUMBER = 100;
+
+    /**
+     * Test the non-pk full cached table where join key count is 1 and join 
key equals the bucket
+     * key.
+     */
+    @Test
+    public void testBucketShuffleForNonPrimaryTableInFullCacheModeCase1() 
throws Exception {
+        String nonPrimaryKeyDimTable = createNonPrimaryKeyDimTable("col1");
+        String query =
+                ("SELECT /*+ LOOKUP('table'='D', 'shuffle'='true') */ T.col1, 
D.col2 FROM T JOIN DIM "
+                                + "for system_time as of T.proc_time AS D ON 
T.col1 = D.col1")
+                        .replace("DIM", nonPrimaryKeyDimTable);
+        testBucketNumberCases(query);
+    }
+
+    /**
+     * Test the non-pk full cached table where join key count is 2, join keys 
equal the bucket keys.
+     */
+    @Test
+    public void testBucketShuffleForNonPrimaryTableInFullCacheModeCase2() 
throws Exception {
+        String nonPrimaryKeyDimTable = 
createNonPrimaryKeyDimTable("col1,col2");
+        String query =
+                ("SELECT /*+ LOOKUP('table'='D', 'shuffle'='true') */ T.col1, 
D.col2 FROM T JOIN DIM "
+                                + "for system_time as of T.proc_time AS D ON 
T.col1 = D.col1 AND T.col2 = D.col2")
+                        .replace("DIM", nonPrimaryKeyDimTable);
+        testBucketNumberCases(query);
+    }
+
+    /**
+     * Test the non-pk full cached table where join key count is 2, bucket key 
is one of join key.
+     */
+    @Test
+    public void testBucketShuffleForNonPrimaryTableInFullCacheModeCase3() 
throws Exception {
+        String nonPrimaryKeyDimTable = createNonPrimaryKeyDimTable("col2");
+        String query =
+                ("SELECT /*+ LOOKUP('table'='D', 'shuffle'='true') */ T.col1, 
D.col2 FROM T JOIN DIM "
+                                + "for system_time as of T.proc_time AS D ON 
T.col1 = D.col1 AND T.col2 = D.col2")
+                        .replace("DIM", nonPrimaryKeyDimTable);
+        testBucketNumberCases(query);
+    }
+
+    /**
+     * Test the non-pk full cached table where join keys contain constant 
keys, bucket key is one of
+     * join key.
+     */
+    @Test
+    public void testBucketShuffleForNonPrimaryTableInFullCacheModeCase4() 
throws Exception {
+        String nonPrimaryKeyDimTable = createNonPrimaryKeyDimTable("col2");
+        String query =
+                ("SELECT /*+ LOOKUP('table'='D', 'shuffle'='true') */ T.col1, 
D.col2 FROM T JOIN DIM "
+                                + "for system_time as of T.proc_time AS D ON 
T.col1 = D.col1 AND T.col2 = D.col2 AND "
+                                + "D.col4 = CAST('2024-06-09' AS DATE) AND 
D.col5 = 123.45 ")
+                        .replace("DIM", nonPrimaryKeyDimTable);
+        testBucketNumberCases(query);
+    }
+
+    /** Test the pk full cached table where join key count is 1, join key 
equals the bucket key. */
+    @Test
+    public void testBucketShuffleForPrimaryTableInFullCacheModeCase1() throws 
Exception {
+        String primaryKeyDimTable = createPrimaryKeyDimTable(1, true, "col1");
+        String query =
+                ("SELECT /*+ LOOKUP('table'='D', 'shuffle'='true') */ T.col1, 
D.col2 FROM T JOIN DIM "
+                                + "for system_time as of T.proc_time AS D ON 
T.col1 = D.col1")
+                        .replace("DIM", primaryKeyDimTable);
+        testBucketNumberCases(query);
+    }
+
+    /** Test the pk full cached table where join key count is 2, join keys 
equal the bucket keys. */
+    @Test
+    public void testBucketShuffleForPrimaryTableInFullCacheModeCase2() throws 
Exception {
+        String primaryKeyDimTable = createPrimaryKeyDimTable(2, true, 
"col1,col2");
+        String query =
+                ("SELECT /*+ LOOKUP('table'='D', 'shuffle'='true') */ T.col1, 
D.col2 FROM T JOIN DIM "
+                                + "for system_time as of T.proc_time AS D ON 
T.col1 = D.col1 AND T.col2 = D.col2")
+                        .replace("DIM", primaryKeyDimTable);
+        testBucketNumberCases(query);
+    }
+
+    /** Test the pk full cached table where join key count is 2, bucket key is 
one of join key. */
+    @Test
+    public void testBucketShuffleForPrimaryTableInFullCacheModeCase3() throws 
Exception {
+        String primaryKeyDimTable = createPrimaryKeyDimTable(2, true, "col2");
+        String query =
+                ("SELECT /*+ LOOKUP('table'='D', 'shuffle'='true') */ T.col1, 
D.col2 FROM T JOIN DIM "
+                                + "for system_time as of T.proc_time AS D ON 
T.col1 = D.col1 AND T.col2 = D.col2")
+                        .replace("DIM", primaryKeyDimTable);
+        testBucketNumberCases(query);
+    }
+
+    /**
+     * Test the pk full cached table where join keys contain constant keys, 
bucket key is one of
+     * join key.
+     */
+    @Test
+    public void testBucketShuffleForPrimaryTableInFullCacheModeCase4() throws 
Exception {
+        String primaryKeyDimTable = createPrimaryKeyDimTable(2, true, "col2");
+        String query =
+                ("SELECT /*+ LOOKUP('table'='D', 'shuffle'='true') */ T.col1, 
D.col2 FROM T JOIN DIM "
+                                + "for system_time as of T.proc_time AS D ON 
T.col1 = D.col1 AND T.col2 = D.col2 AND "
+                                + "D.col4 = CAST('2024-06-09' AS DATE) AND 
D.col5 = 123.45 ")
+                        .replace("DIM", primaryKeyDimTable);
+        testBucketNumberCases(query);
+    }
+
+    /** Test the pk auto cached table where join key count is 1, join key 
equals the bucket key. */
+    @Test
+    public void testBucketShuffleForPrimaryTableInAutoCacheModeCase1() throws 
Exception {
+        String primaryKeyDimTable = createPrimaryKeyDimTable(1, false, "col1");
+        String query =
+                ("SELECT /*+ LOOKUP('table'='D', 'shuffle'='true') */ T.col1, 
D.col2 FROM T JOIN DIM "
+                                + "for system_time as of T.proc_time AS D ON 
T.col1 = D.col1")
+                        .replace("DIM", primaryKeyDimTable);
+        testBucketNumberCases(query);
+    }
+
+    /** Test the pk auto cached table where join key count is 2, join keys 
equal the bucket keys. */
+    @Test
+    public void testBucketShuffleForPrimaryTableInAutoCacheModeCase2() throws 
Exception {
+        String primaryKeyDimTable = createPrimaryKeyDimTable(2, false, 
"col1,col2");
+        String query =
+                ("SELECT /*+ LOOKUP('table'='D', 'shuffle'='true') */ T.col1, 
D.col2 FROM T JOIN DIM "
+                                + "for system_time as of T.proc_time AS D ON 
T.col1 = D.col1 "
+                                + "AND T.col2 = D.col2")
+                        .replace("DIM", primaryKeyDimTable);
+        testBucketNumberCases(query);
+    }
+
+    /** Test the pk auto cached table where join key count is 2, bucket key is 
one of join key. */
+    @Test
+    public void testBucketShuffleForPrimaryTableInAutoCacheModeCase3() throws 
Exception {
+        String primaryKeyDimTable = createPrimaryKeyDimTable(2, false, "col2");
+        String query =
+                ("SELECT /*+ LOOKUP('table'='D', 'shuffle'='true') */ T.col1, 
D.col2 FROM T JOIN DIM "
+                                + "for system_time as of T.proc_time AS D ON 
T.col1 = D.col1 "
+                                + "AND T.col2 = D.col2")
+                        .replace("DIM", primaryKeyDimTable);
+        testBucketNumberCases(query);
+    }
+
+    /**
+     * Test the pk auto cached table where join keys contain constant keys, 
bucket key is one of
+     * join key.
+     */
+    @Test
+    public void testBucketShuffleForPrimaryTableInAutoCacheModeCase4() throws 
Exception {
+        String primaryKeyDimTable = createPrimaryKeyDimTable(2, false, "col2");
+        String query =
+                ("SELECT /*+ LOOKUP('table'='D', 'shuffle'='true') */ T.col1, 
D.col2 FROM T JOIN DIM "
+                                + "for system_time as of T.proc_time AS D ON 
T.col1 = D.col1 AND T.col2 = D.col2 AND "
+                                + "D.col4 = CAST('2024-06-09' AS DATE) AND 
D.col5 = 123.45")
+                        .replace("DIM", primaryKeyDimTable);
+        testBucketNumberCases(query);
+    }
+
+    private void testBucketNumberCases(String query) throws Exception {
+        List<Row> groundTruthRows = getGroundTruthRows();
+        // Test the case that bucket number = lookup parallelism.
+        sEnv.getConfig()
+                
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 
BUCKET_NUMBER);
+        List<Row> result1 = streamSqlBlockIter(query).collect(ROW_NUMBER);
+        
assertThat(result1).containsExactlyInAnyOrderElementsOf(groundTruthRows);
+        // Test the case that bucket number > lookup parallelism.
+        
sEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 3);
+        List<Row> result2 = streamSqlBlockIter(query).collect(ROW_NUMBER);
+        
assertThat(result2).containsExactlyInAnyOrderElementsOf(groundTruthRows);
+        // Test the case that bucket number < lookup parallelism.
+        
sEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
 8);
+        List<Row> result3 = streamSqlBlockIter(query).collect(ROW_NUMBER);
+        
assertThat(result3).containsExactlyInAnyOrderElementsOf(groundTruthRows);
+    }
+
+    private String createPrimaryKeyDimTable(
+            int primaryKeyNumber, boolean fullCache, String bucketKey) {
+        createSourceTable();
+        String tableName = "DIM_PRIMARY";
+        if (fullCache) {
+            tableName += "_full_cache";
+        } else {
+            tableName += "_auto";
+        }
+        if (bucketKey != null) {
+            tableName += "_" + bucketKey.split(",").length;
+        }
+        String ddl;
+        if (primaryKeyNumber == 1) {
+            tableName += "_1";
+            ddl =
+                    String.format(
+                            "CREATE TABLE %s (col1 INT, col2 STRING, col3 INT, 
col4 DATE, col5 DECIMAL(10, 2), "
+                                    + "PRIMARY KEY (col1) NOT ENFORCED) WITH"
+                                    + " ('continuous.discovery-interval'='1 
ms', 'bucket'='%s'",
+                            tableName, BUCKET_NUMBER);
+        } else {
+            tableName += "_2";
+            ddl =
+                    String.format(
+                            "CREATE TABLE %s (col1 INT, col2 STRING, col3 INT, 
col4 DATE, col5 DECIMAL(10, 2), "
+                                    + "PRIMARY KEY (col1, col2) NOT ENFORCED) 
WITH"
+                                    + " ('continuous.discovery-interval'='1 
ms', 'bucket'='%s'",
+                            tableName, BUCKET_NUMBER);
+        }
+        if (bucketKey != null) {
+            ddl += ", 'bucket-key' = '" + bucketKey + "'";
+        }
+        if (fullCache) {
+            ddl += " ,'lookup.cache' = 'full')";
+        } else {
+            ddl += " )";
+        }
+        batchSql(ddl);
+        StringBuilder dml = new StringBuilder(String.format("INSERT INTO %s 
VALUES ", tableName));
+        for (int index = 1; index < ROW_NUMBER; ++index) {
+            dml.append(
+                    String.format(
+                            "(%s, '%s', %s, CAST('2024-06-09' AS DATE), 
123.45), ",
+                            index, index * 10, index * 10));
+        }
+        dml.append(
+                String.format(
+                        "(%s, '%s', %s, CAST('2024-06-09' AS DATE), 123.45)",
+                        ROW_NUMBER, ROW_NUMBER * 10, ROW_NUMBER * 10));
+        batchSql(dml.toString());
+        return tableName;
+    }
+
+    private String createNonPrimaryKeyDimTable(String bucketKey) {
+        createSourceTable();
+        String tableName = "DIM";
+        if (bucketKey != null) {
+            tableName += "_" + bucketKey.split(",").length;
+        }
+        String ddl =
+                String.format(
+                        "CREATE TABLE %s (col1 INT, col2 STRING, col3 INT, 
col4 DATE,"
+                                + " col5 DECIMAL(10, 2)) WITH"
+                                + " ('continuous.discovery-interval'='1 ms', 
'bucket'='%s'",
+                        tableName, BUCKET_NUMBER);
+        if (bucketKey != null) {
+            ddl += ", 'bucket-key' = '" + bucketKey + "')";
+        }
+        batchSql(ddl);
+        StringBuilder dml = new StringBuilder(String.format("INSERT INTO %s 
VALUES ", tableName));
+        for (int index = 1; index < ROW_NUMBER; ++index) {
+            dml.append(
+                    String.format(
+                            "(%s, '%s', %s, CAST('2024-06-09' AS DATE), 
123.45), ",
+                            index, index * 10, index * 10));
+        }
+        dml.append(
+                String.format(
+                        "(%s, '%s', %s, CAST('2024-06-09' AS DATE), 123.45)",
+                        ROW_NUMBER, ROW_NUMBER * 10, ROW_NUMBER * 10));
+        batchSql(dml.toString());
+        return tableName;
+    }
+
+    private void createSourceTable() {
+        String ddl = "CREATE TABLE T (col1 INT, col2 STRING, col3 INT, 
`proc_time` AS PROCTIME())";
+        batchSql(ddl);
+        StringBuilder dml = new StringBuilder("INSERT INTO T VALUES ");
+        for (int index = 1; index < ROW_NUMBER; ++index) {
+            dml.append(String.format("(%s, '%s', %s), ", index, index * 10, 
index * 10));
+        }
+        dml.append(String.format("(%s, '%s', %s)", ROW_NUMBER, ROW_NUMBER * 
10, ROW_NUMBER * 10));
+        batchSql(dml.toString());
+    }
+
+    private List<Row> getGroundTruthRows() {
+        List<Row> results = new ArrayList<>();
+        for (int index = 1; index <= ROW_NUMBER; ++index) {
+            results.add(Row.of(index, String.valueOf(index * 10)));
+        }
+        return results;
+    }
+}
diff --git a/paimon-flink/paimon-flink1-common/pom.xml 
b/paimon-flink/paimon-flink1-common/pom.xml
index d580b5dcda..1cfca5226e 100644
--- a/paimon-flink/paimon-flink1-common/pom.xml
+++ b/paimon-flink/paimon-flink1-common/pom.xml
@@ -49,6 +49,12 @@ under the License.
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsLookupCustomShuffle.java
 
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsLookupCustomShuffle.java
new file mode 100644
index 0000000000..6e31c1b800
--- /dev/null
+++ 
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsLookupCustomShuffle.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.abilities;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.data.RowData;
+
+import java.io.Serializable;
+import java.util.Optional;
+
+/**
+ * This interface is designed to allow connectors to provide a custom 
partitioning strategy for the
+ * data that is fed into the {@link LookupTableSource}. This enables the Flink 
Planner to optimize
+ * the distribution of input stream across different subtasks of lookup-join 
node to match the
+ * distribution of data in the external data source.
+ */
+@PublicEvolving
+public interface SupportsLookupCustomShuffle {
+    /**
+     * This method is used to retrieve a custom partitioner that will be 
applied to the input stream
+     * of lookup-join node.
+     *
+     * @return An {@link InputDataPartitioner} that defines how records should 
be distributed across
+     *     the different subtasks. If the connector expects the input data to 
remain in its original
+     *     distribution, an {@link Optional#empty()} should be returned.
+     */
+    Optional<InputDataPartitioner> getPartitioner();
+
+    /**
+     * This interface is responsible for providing custom partitioning logic 
for the RowData
+     * records. We didn't use {@link Partitioner} directly because the input 
data is always RowData
+     * type, and we need to extract all join keys from the input data before 
send it to partitioner.
+     */
+    @PublicEvolving
+    interface InputDataPartitioner extends Serializable {
+        /**
+         * Determining the partition id for each input data.
+         *
+         * <p>This data is projected to only including all join keys before 
emit to this
+         * partitioner.
+         *
+         * @param joinKeys The extracted join key for each input record.
+         * @param numPartitions The total number of partition.
+         * @return An integer representing the partition id to which the 
record should be sent.
+         */
+        int partition(RowData joinKeys, int numPartitions);
+
+        /**
+         * Returns information about the determinism of this partitioner.
+         *
+         * <p>It returns true if and only if a call to the {@link 
#partition(RowData, int)} method
+         * is guaranteed to always return the same result given the same 
joinKeyRow. If the
+         * partitioning logic depends on not purely functional like <code>
+         * random(), date(), now(), ...</code> this method must return false.
+         *
+         * <p>If this method return false, planner may not apply this 
partitioner in upsert mode to
+         * avoid out-of-order of the changelog events.
+         */
+        default boolean isDeterministic() {
+            return true;
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
 
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
similarity index 71%
rename from 
paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
rename to 
paimon-flink/paimon-flink1-common/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
index 460fea55ad..1087dcb65c 100644
--- 
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
+++ 
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
@@ -19,6 +19,10 @@
 package org.apache.paimon.flink.utils;
 
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.functions.FunctionContext;
+
+import javax.annotation.Nullable;
 
 /** Utility methods about Flink runtime context to resolve compatibility 
issues. */
 public class RuntimeContextUtils {
@@ -29,4 +33,16 @@ public class RuntimeContextUtils {
     public static int getIndexOfThisSubtask(RuntimeContext context) {
         return context.getIndexOfThisSubtask();
     }
+
+    public static @Nullable Integer 
getNumberOfParallelSubtasks(FunctionContext context) {
+        return null;
+    }
+
+    public static @Nullable Integer getIndexOfThisSubtask(FunctionContext 
context) {
+        return null;
+    }
+
+    public static boolean preferCustomShuffle(LookupTableSource.LookupContext 
context) {
+        return false;
+    }
 }
diff --git a/paimon-flink/paimon-flink2-common/pom.xml 
b/paimon-flink/paimon-flink2-common/pom.xml
index 2a5271f78b..845534c71d 100644
--- a/paimon-flink/paimon-flink2-common/pom.xml
+++ b/paimon-flink/paimon-flink2-common/pom.xml
@@ -49,6 +49,12 @@ under the License.
             <version>${flink.version}</version>
             <scope>provided</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-table-common</artifactId>
+            <version>${flink.version}</version>
+            <scope>provided</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-runtime</artifactId>
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
 
b/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
similarity index 68%
rename from 
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
rename to 
paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
index 34e0d041b6..ff5fa86812 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
+++ 
b/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
@@ -19,6 +19,10 @@
 package org.apache.paimon.flink.utils;
 
 import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.functions.FunctionContext;
+
+import javax.annotation.Nullable;
 
 /** Utility methods about Flink runtime context to resolve compatibility 
issues. */
 public class RuntimeContextUtils {
@@ -29,4 +33,16 @@ public class RuntimeContextUtils {
     public static int getIndexOfThisSubtask(RuntimeContext context) {
         return context.getTaskInfo().getIndexOfThisSubtask();
     }
+
+    public static @Nullable Integer 
getNumberOfParallelSubtasks(FunctionContext context) {
+        return context.getTaskInfo().getNumberOfParallelSubtasks();
+    }
+
+    public static @Nullable Integer getIndexOfThisSubtask(FunctionContext 
context) {
+        return context.getTaskInfo().getIndexOfThisSubtask();
+    }
+
+    public static boolean preferCustomShuffle(LookupTableSource.LookupContext 
context) {
+        return context.preferCustomShuffle();
+    }
 }

Reply via email to