This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new df3033c30c [flink] Support custom lookup shuffle for fixed bucket
table (#5478)
df3033c30c is described below
commit df3033c30cbe5de8079b9aba910e0b5f2d0059ea
Author: yunfengzhou-hub <[email protected]>
AuthorDate: Fri Apr 18 11:21:14 2025 +0800
[flink] Support custom lookup shuffle for fixed bucket table (#5478)
---
docs/content/flink/sql-lookup.md | 17 ++
.../paimon/flink/utils/RuntimeContextUtils.java | 32 ---
.../paimon/flink/utils/RuntimeContextUtils.java | 32 ---
.../paimon/flink/utils/RuntimeContextUtils.java | 32 ---
.../flink/lookup/FileStoreLookupFunction.java | 26 +-
.../lookup/partitioner/BucketIdExtractor.java | 100 +++++++
.../partitioner/BucketShufflePartitioner.java | 46 +++
.../lookup/partitioner/BucketShuffleStrategy.java | 151 ++++++++++
.../flink/lookup/partitioner/ShuffleStrategy.java | 48 ++++
.../paimon/flink/source/BaseDataTableSource.java | 57 +++-
.../paimon/flink/lookup/BucketIdExtractorTest.java | 257 +++++++++++++++++
.../flink/lookup/BucketShufflePartitionerTest.java | 215 ++++++++++++++
.../flink/lookup/BucketShuffleStrategyTest.java | 103 +++++++
.../flink/lookup/FileStoreLookupFunctionTest.java | 6 +-
.../lookup/LookupJoinBucketShuffleITCase.java | 312 +++++++++++++++++++++
paimon-flink/paimon-flink1-common/pom.xml | 6 +
.../abilities/SupportsLookupCustomShuffle.java | 81 ++++++
.../paimon/flink/utils/RuntimeContextUtils.java | 16 ++
paimon-flink/paimon-flink2-common/pom.xml | 6 +
.../paimon/flink/utils/RuntimeContextUtils.java | 16 ++
20 files changed, 1456 insertions(+), 103 deletions(-)
diff --git a/docs/content/flink/sql-lookup.md b/docs/content/flink/sql-lookup.md
index d28b7333ce..3b160dbfc9 100644
--- a/docs/content/flink/sql-lookup.md
+++ b/docs/content/flink/sql-lookup.md
@@ -120,6 +120,23 @@ your streaming job may be blocked. You can try to use
`audit_log` system table f
(convert CDC stream to append stream).
{{< /hint >}}
+## Large Scale Lookup (Fixed Bucket)
+
+By default, each Flink subtask would store a whole copy of the lookup table.
If the amount of data in `customers`
+(lookup table) is too large for a single subtask, you can enable the shuffle
lookup optimization as follows
+(For Flink 2.0+ and fixed-bucket Paimon table). This optimization enables
sending data of the same bucket to designated
+subtask(s), so each Flink subtask would only need to store a part of the whole
data.
+
+```sql
+-- enrich each order with customer information
+SELECT /*+ LOOKUP('table'='c', 'shuffle'='true') */
+o.order_id, o.total, c.country, c.zip
+FROM orders AS o
+JOIN customers
+FOR SYSTEM_TIME AS OF o.proc_time AS c
+ON o.customer_id = c.id;
+```
+
## Dynamic Partition
In traditional data warehouses, each partition often maintains the latest full
data, so this partition table only
diff --git
a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
b/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
deleted file mode 100644
index 460fea55ad..0000000000
---
a/paimon-flink/paimon-flink-1.15/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.utils;
-
-import org.apache.flink.api.common.functions.RuntimeContext;
-
-/** Utility methods about Flink runtime context to resolve compatibility
issues. */
-public class RuntimeContextUtils {
- public static int getNumberOfParallelSubtasks(RuntimeContext context) {
- return context.getNumberOfParallelSubtasks();
- }
-
- public static int getIndexOfThisSubtask(RuntimeContext context) {
- return context.getIndexOfThisSubtask();
- }
-}
diff --git
a/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
b/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
deleted file mode 100644
index 460fea55ad..0000000000
---
a/paimon-flink/paimon-flink-1.16/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.utils;
-
-import org.apache.flink.api.common.functions.RuntimeContext;
-
-/** Utility methods about Flink runtime context to resolve compatibility
issues. */
-public class RuntimeContextUtils {
- public static int getNumberOfParallelSubtasks(RuntimeContext context) {
- return context.getNumberOfParallelSubtasks();
- }
-
- public static int getIndexOfThisSubtask(RuntimeContext context) {
- return context.getIndexOfThisSubtask();
- }
-}
diff --git
a/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
b/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
deleted file mode 100644
index 460fea55ad..0000000000
---
a/paimon-flink/paimon-flink-1.17/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.utils;
-
-import org.apache.flink.api.common.functions.RuntimeContext;
-
-/** Utility methods about Flink runtime context to resolve compatibility
issues. */
-public class RuntimeContextUtils {
- public static int getNumberOfParallelSubtasks(RuntimeContext context) {
- return context.getNumberOfParallelSubtasks();
- }
-
- public static int getIndexOfThisSubtask(RuntimeContext context) {
- return context.getIndexOfThisSubtask();
- }
-}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
index aff57c6c36..fd8b629507 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/FileStoreLookupFunction.java
@@ -25,6 +25,8 @@ import org.apache.paimon.data.JoinedRow;
import org.apache.paimon.flink.FlinkConnectorOptions.LookupCacheMode;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.flink.FlinkRowWrapper;
+import org.apache.paimon.flink.lookup.partitioner.ShuffleStrategy;
+import org.apache.paimon.flink.utils.RuntimeContextUtils;
import org.apache.paimon.flink.utils.TableScanUtils;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
@@ -33,6 +35,7 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.OutOfRangeException;
import org.apache.paimon.utils.FileIOUtils;
import org.apache.paimon.utils.Filter;
+import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.shade.guava30.com.google.common.primitives.Ints;
@@ -84,6 +87,7 @@ public class FileStoreLookupFunction implements Serializable,
Closeable {
private final List<String> joinKeys;
@Nullable private final Predicate predicate;
@Nullable private final RefreshBlacklist refreshBlacklist;
+ @Nullable private final ShuffleStrategy strategy;
private final List<InternalRow.FieldGetter> projectFieldsGetters;
@@ -103,7 +107,8 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
FileStoreTable table,
int[] projection,
int[] joinKeyIndex,
- @Nullable Predicate predicate) {
+ @Nullable Predicate predicate,
+ @Nullable ShuffleStrategy strategy) {
if (!TableScanUtils.supportCompactDiffStreamingReading(table)) {
TableScanUtils.streamingReadingValidate(table);
}
@@ -143,6 +148,8 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
this.refreshBlacklist =
RefreshBlacklist.create(
table.options().get(LOOKUP_REFRESH_TIME_PERIODS_BLACKLIST.key()));
+
+ this.strategy = strategy;
}
public void open(FunctionContext context) throws Exception {
@@ -404,8 +411,21 @@ public class FileStoreLookupFunction implements
Serializable, Closeable {
* @return the set of bucket IDs to be cached
*/
protected Set<Integer> getRequireCachedBucketIds() {
- // TODO: Implement the method when Flink support bucket shuffle for
lookup join.
- return null;
+ if (strategy == null) {
+ return null;
+ }
+ @Nullable
+ Integer indexOfThisSubtask =
RuntimeContextUtils.getIndexOfThisSubtask(functionContext);
+ @Nullable
+ Integer numberOfParallelSubtasks =
+
RuntimeContextUtils.getNumberOfParallelSubtasks(functionContext);
+ if (indexOfThisSubtask == null) {
+ Preconditions.checkState(numberOfParallelSubtasks == null);
+ return null;
+ } else {
+ Preconditions.checkState(numberOfParallelSubtasks != null);
+ }
+ return strategy.getRequiredCacheBucketIds(indexOfThisSubtask,
numberOfParallelSubtasks);
}
protected void setCacheRowFilter(@Nullable Filter<InternalRow>
cacheRowFilter) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/partitioner/BucketIdExtractor.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/partitioner/BucketIdExtractor.java
new file mode 100644
index 0000000000..ad0bb1f55d
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/partitioner/BucketIdExtractor.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup.partitioner;
+
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.Projection;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.flink.FlinkRowWrapper;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.sink.KeyAndBucketExtractor;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+
+import org.apache.flink.table.data.RowData;
+
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.utils.Preconditions.checkState;
+
+/**
+ * {@link BucketIdExtractor} is used to extract the bucket id from join keys.
It requires that the
+ * join keys contain all bucket keys and projects the bucket key fields from
the given join key row.
+ */
+public class BucketIdExtractor implements Serializable {
+
+ private final int numBuckets;
+
+ private final TableSchema tableSchema;
+
+ private final List<String> joinKeyFieldNames;
+
+ private final List<String> bucketKeyFieldNames;
+
+ private Projection bucketKeyProjection;
+
+ public BucketIdExtractor(
+ int numBuckets,
+ TableSchema tableSchema,
+ List<String> joinKeyFieldNames,
+ List<String> bucketKeyFieldNames) {
+ checkState(
+ new
HashSet<>(joinKeyFieldNames).containsAll(bucketKeyFieldNames),
+ "The join keys must contain all bucket keys.");
+ checkState(numBuckets > 0, "Number of buckets should be positive.");
+ this.numBuckets = numBuckets;
+ this.joinKeyFieldNames = joinKeyFieldNames;
+ this.bucketKeyFieldNames = bucketKeyFieldNames;
+ this.tableSchema = tableSchema;
+ }
+
+ public int extractBucketId(RowData joinKeyRow) {
+ checkState(joinKeyRow.getArity() == joinKeyFieldNames.size());
+ if (bucketKeyProjection == null) {
+ bucketKeyProjection = generateBucketKeyProjection();
+ }
+ FlinkRowWrapper internalRow = new FlinkRowWrapper(joinKeyRow);
+ BinaryRow bucketKey = bucketKeyProjection.apply(internalRow);
+ int bucket =
+ KeyAndBucketExtractor.bucket(
+ KeyAndBucketExtractor.bucketKeyHashCode(bucketKey),
numBuckets);
+ checkState(bucket < numBuckets);
+ return bucket;
+ }
+
+ private Projection generateBucketKeyProjection() {
+ int[] bucketKeyIndexes =
+
bucketKeyFieldNames.stream().mapToInt(joinKeyFieldNames::indexOf).toArray();
+ List<DataField> joinKeyDataFields =
+ joinKeyFieldNames.stream()
+ .map(
+ joinKeyFieldName ->
+ tableSchema
+ .fields()
+ .get(
+ tableSchema
+ .fieldNames()
+
.indexOf(joinKeyFieldName)))
+ .collect(Collectors.toList());
+ return CodeGenUtils.newProjection(new RowType(joinKeyDataFields),
bucketKeyIndexes);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/partitioner/BucketShufflePartitioner.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/partitioner/BucketShufflePartitioner.java
new file mode 100644
index 0000000000..20ef5e8c5a
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/partitioner/BucketShufflePartitioner.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup.partitioner;
+
+import
org.apache.flink.table.connector.source.abilities.SupportsLookupCustomShuffle.InputDataPartitioner;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.MathUtils;
+
+/**
+ * {@link BucketShufflePartitioner} class partitions rows based on the bucket
id. It uses a custom
+ * strategy and an extractor to determine the target partition for a given set
of join keys.
+ */
+public class BucketShufflePartitioner implements InputDataPartitioner {
+
+ private final ShuffleStrategy strategy;
+
+ private final BucketIdExtractor extractor;
+
+ public BucketShufflePartitioner(ShuffleStrategy strategy,
BucketIdExtractor extractor) {
+ this.strategy = strategy;
+ this.extractor = extractor;
+ }
+
+ @Override
+ public int partition(RowData joinKeys, int numPartitions) {
+ int bucketId = extractor.extractBucketId(joinKeys);
+ int joinKeyHash = MathUtils.murmurHash(joinKeys.hashCode());
+ return strategy.getTargetSubtaskId(bucketId, joinKeyHash,
numPartitions);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/partitioner/BucketShuffleStrategy.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/partitioner/BucketShuffleStrategy.java
new file mode 100644
index 0000000000..4bf02a19e9
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/partitioner/BucketShuffleStrategy.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup.partitioner;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.paimon.utils.Preconditions.checkState;
+
+/**
+ * {@link BucketShuffleStrategy} defines a strategy for determining the target
subtask id for a
+ * given bucket id and join key hash. It also provides a method to retrieve
the set of bucket ids
+ * that are required to be cached by a specific subtask.
+ */
+public class BucketShuffleStrategy implements ShuffleStrategy {
+
+ private final int numBuckets;
+
+ public BucketShuffleStrategy(int numBuckets) {
+ checkState(numBuckets > 0, "Number of buckets should be positive.");
+ this.numBuckets = numBuckets;
+ }
+
+ /**
+ * Determines the target subtask ID for a given bucket ID and join key
hash.
+ *
+ * <p>The method uses two different strategies based on the comparison
between the number of
+ * buckets and the number of subtasks:
+ *
+ * <p>1. If the number of buckets is greater than or equal to the number
of subtasks (numBuckets
+ * >= numSubtasks), then the target subtask ID is determined by assigning
buckets to subtasks in
+ * a round-robin manner:
+ *
+ * {@code
+ * e.g., numBuckets = 5, numSubtasks = 3
+ * Bucket 0 -> Subtask 0
+ * Bucket 1 -> Subtask 1
+ * Bucket 2 -> Subtask 2
+ * Bucket 3 -> Subtask 0
+ * Bucket 4 -> Subtask 1
+ *
+ * }
+ *
+ * <p>2. If the number of buckets is less than the number of subtasks
(numBuckets <
+ * numSubtasks), then the target subtask ID is determined by distributing
subtasks to buckets in
+ * a round-robin manner:
+ *
+ * {@code
+ * e.g., numBuckets = 2, numSubtasks = 5
+ * Bucket 0 -> Subtask 0,2,4
+ * Bucket 1 -> Subtask 1,3
+ *
+ * }
+ *
+ * @param bucketId The ID of the bucket.
+ * @param joinKeyHash The hash of the join key.
+ * @param numSubtasks The total number of target subtasks.
+ * @return The target subtask ID for the given bucket ID and join key hash.
+ */
+ public int getTargetSubtaskId(int bucketId, int joinKeyHash, int
numSubtasks) {
+ if (numBuckets >= numSubtasks) {
+ return bucketId % numSubtasks;
+ } else {
+ // Calculate the number of ranges to group the subtasks
+ int bucketNumMultipleRange = numSubtasks / numBuckets;
+ int adjustedBucketNumMultipleRange;
+ // If the start index of the current bucket plus the total range
exceeds
+ // the number of subtasks, use the calculated range without
adjustment.
+ // Example: bucketId = 1, numSubtasks = 4, numBuckets = 2
+ if ((bucketId + bucketNumMultipleRange * numBuckets) >=
numSubtasks) {
+ adjustedBucketNumMultipleRange = bucketNumMultipleRange;
+ } else {
+ // Otherwise, increment the range by one to ensure all
subtasks are included.
+ // Example: bucketId = 0, numSubtasks = 5, numBuckets = 2
+ adjustedBucketNumMultipleRange = bucketNumMultipleRange + 1;
+ }
+ return (joinKeyHash % adjustedBucketNumMultipleRange) * numBuckets
+ bucketId;
+ }
+ }
+
+ /**
+ * Retrieves the set of bucket IDs that are required to be cached by a
specific subtask. This
+ * method uses the same strategy as {@link #getTargetSubtaskId(int, int,
int)} to ensure
+ * consistency in bucket to subtask mapping.
+ *
+ * <p>The method uses two different strategies based on the comparison
between the number of
+ * buckets and the number of subtasks:
+ *
+ * <p>1. If the number of buckets is greater than or equal to the number
of subtasks (numBuckets
+ * >= numSubtasks), then each subtask caches buckets in a round-robin
manner.
+ *
+ * {@code
+ * e.g., numBuckets = 5, numSubtasks = 3
+ * Subtask 0 -> Bucket 0,3
+ * Subtask 1 -> Bucket 1,4
+ * Subtask 2 -> Bucket 2
+ *
+ * }
+ *
+ * <p>2. If the number of buckets is less than the number of subtasks
(numBuckets <
+ * numSubtasks), then each subtask caches buckets by polling from buckets
in a round-robin
+ * manner:
+ *
+ * {@code
+ * e.g., numBuckets = 2, numSubtasks = 5
+ * Subtask 0 -> Bucket 0
+ * Subtask 1 -> Bucket 1
+ * Subtask 2 -> Bucket 0
+ * Subtask 3 -> Bucket 1
+ * Subtask 4 -> Bucket 0
+ *
+ * }
+ *
+ * @param subtaskId The ID of the subtask.
+ * @param numSubtasks The total number of subtasks.
+ * @return The set of bucket IDs that are required to be cached by the
specified subtask.
+ */
+ public Set<Integer> getRequiredCacheBucketIds(int subtaskId, int
numSubtasks) {
+ Set<Integer> requiredCacheBucketIds = new HashSet<>();
+ if (numBuckets >= numSubtasks) {
+ for (int bucketId = 0; bucketId < numBuckets; bucketId++) {
+ if (bucketId % numSubtasks == subtaskId) {
+ requiredCacheBucketIds.add(bucketId);
+ }
+ }
+ } else {
+ for (int bucketId = 0; bucketId < numBuckets; bucketId++) {
+ if (bucketId == subtaskId % numBuckets) {
+ requiredCacheBucketIds.add(bucketId);
+ }
+ }
+ }
+ return requiredCacheBucketIds;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/partitioner/ShuffleStrategy.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/partitioner/ShuffleStrategy.java
new file mode 100644
index 0000000000..2917777afe
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/lookup/partitioner/ShuffleStrategy.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup.partitioner;
+
+import java.io.Serializable;
+import java.util.Set;
+
+/**
+ * Shuffle strategy use to assign bucket to subtask and get all the required
buckets for a given
+ * task.
+ */
+public interface ShuffleStrategy extends Serializable {
+
+ /**
+ * Calculates and returns the subtask ID that a given bucket should be
assigned to.
+ *
+ * @param bucketId The ID of the bucket.
+ * @param joinKeyHash The hash value of the join key.
+ * @param numSubtasks The total number of subtasks.
+ * @return The subtask ID that the bucket should be assigned to.
+ */
+ int getTargetSubtaskId(int bucketId, int joinKeyHash, int numSubtasks);
+
+ /**
+ * Returns all the bucket IDs required for a given subtask.
+ *
+ * @param subtaskId The ID of the subtask.
+ * @param numSubtasks The total number of subtasks.
+ * @return A set containing all the bucket IDs required for the subtask.
+ */
+ Set<Integer> getRequiredCacheBucketIds(int subtaskId, int numSubtasks);
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
index 77de76d867..81cced9a74 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
@@ -28,10 +28,16 @@ import org.apache.paimon.flink.log.LogSourceProvider;
import org.apache.paimon.flink.log.LogStoreTableFactory;
import org.apache.paimon.flink.lookup.FileStoreLookupFunction;
import org.apache.paimon.flink.lookup.LookupRuntimeProviderFactory;
+import org.apache.paimon.flink.lookup.partitioner.BucketIdExtractor;
+import org.apache.paimon.flink.lookup.partitioner.BucketShufflePartitioner;
+import org.apache.paimon.flink.lookup.partitioner.BucketShuffleStrategy;
+import org.apache.paimon.flink.lookup.partitioner.ShuffleStrategy;
+import org.apache.paimon.flink.utils.RuntimeContextUtils;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
@@ -46,20 +52,25 @@ import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.source.LookupTableSource;
import org.apache.flink.table.connector.source.SourceProvider;
import
org.apache.flink.table.connector.source.abilities.SupportsAggregatePushDown;
+import
org.apache.flink.table.connector.source.abilities.SupportsLookupCustomShuffle;
import
org.apache.flink.table.connector.source.abilities.SupportsWatermarkPushDown;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.expressions.AggregateExpression;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.types.DataType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
@@ -82,7 +93,11 @@ import static
org.apache.paimon.utils.Preconditions.checkNotNull;
* batch mode or streaming mode.
*/
public abstract class BaseDataTableSource extends FlinkTableSource
- implements LookupTableSource, SupportsWatermarkPushDown,
SupportsAggregatePushDown {
+ implements LookupTableSource,
+ SupportsWatermarkPushDown,
+ SupportsAggregatePushDown,
+ SupportsLookupCustomShuffle {
+ private static final Logger LOG =
LoggerFactory.getLogger(BaseDataTableSource.class);
private static final List<ConfigOption<?>> TIME_TRAVEL_OPTIONS =
Arrays.asList(
@@ -98,7 +113,7 @@ public abstract class BaseDataTableSource extends
FlinkTableSource
protected final boolean unbounded;
protected final DynamicTableFactory.Context context;
@Nullable protected final LogStoreTableFactory logStoreTableFactory;
-
+ @Nullable private BucketShufflePartitioner bucketShufflePartitioner;
@Nullable protected WatermarkStrategy<RowData> watermarkStrategy;
@Nullable protected Long countPushed;
@@ -276,7 +291,32 @@ public abstract class BaseDataTableSource extends
FlinkTableSource
protected FileStoreLookupFunction getFileStoreLookupFunction(
LookupContext context, FileStoreTable table, int[] projection,
int[] joinKey) {
- return new FileStoreLookupFunction(table, projection, joinKey,
predicate);
+ // 1.Get the join key field names from join key indexes.
+ List<String> joinKeyFieldNames =
+ Arrays.stream(joinKey)
+ .mapToObj(i ->
table.rowType().getFieldNames().get(projection[i]))
+ .collect(Collectors.toList());
+ // 2.Get the bucket key field names from bucket key indexes.
+ List<String> bucketKeyFieldNames = table.schema().bucketKeys();
+ boolean useCustomShuffle =
+ supportBucketShufflePartitioner(joinKeyFieldNames,
bucketKeyFieldNames)
+ && RuntimeContextUtils.preferCustomShuffle(context);
+ int numBuckets;
+ ShuffleStrategy strategy = null;
+ if (useCustomShuffle) {
+ numBuckets = table.store().options().bucket();
+ BucketIdExtractor extractor =
+ new BucketIdExtractor(
+ numBuckets, table.schema(), joinKeyFieldNames,
bucketKeyFieldNames);
+
+ strategy = new BucketShuffleStrategy(numBuckets);
+ bucketShufflePartitioner = new BucketShufflePartitioner(strategy,
extractor);
+ }
+
+ if (strategy != null) {
+ LOG.info("Paimon connector is using bucket shuffle partitioning
strategy.");
+ }
+ return new FileStoreLookupFunction(table, projection, joinKey,
predicate, strategy);
}
private FileStoreTable timeTravelDisabledTable(FileStoreTable table) {
@@ -361,4 +401,15 @@ public abstract class BaseDataTableSource extends
FlinkTableSource
public boolean isUnbounded() {
return unbounded;
}
+
+ @Override
+ public Optional<InputDataPartitioner> getPartitioner() {
+ return Optional.ofNullable(bucketShufflePartitioner);
+ }
+
+ private boolean supportBucketShufflePartitioner(
+ List<String> joinKeyFieldNames, List<String> bucketKeyFieldNames) {
+ return BucketMode.HASH_FIXED.equals(((FileStoreTable)
table).bucketMode())
+ && new
HashSet<>(joinKeyFieldNames).containsAll(bucketKeyFieldNames);
+ }
}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/BucketIdExtractorTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/BucketIdExtractorTest.java
new file mode 100644
index 0000000000..c5c2759c26
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/BucketIdExtractorTest.java
@@ -0,0 +1,257 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.CatalogITCaseBase;
+import org.apache.paimon.flink.lookup.partitioner.BucketIdExtractor;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.DataSplit;
+
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Random;
+import java.util.function.Function;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** The test for {@link BucketIdExtractor}. */
+public class BucketIdExtractorTest extends CatalogITCaseBase {
+
+ private static final int BUCKET_NUMBER = 5;
+
+ private static final int COL_NUMBER = 5;
+
+ private static final int ROW_NUMBER = 100;
+
+ @Test
+ public void testJoinKeyEqualToSingleBucketKey() throws Exception {
+ Random seed = new Random();
+ int bucketId = seed.nextInt(COL_NUMBER);
+ int bucketKeyIndex = seed.nextInt(5);
+ String bucketKeyName = "col" + (bucketKeyIndex + 1);
+ FileStoreTable table = createTestTable(bucketKeyName);
+ List<List<Object>> bucketKeyRows =
+ getGroundTruthBucketKeyRows(
+ table,
+ bucketId,
+
createBucketKeyGetter(Collections.singletonList(bucketKeyIndex)));
+ List<String> joinKeyNames = Collections.singletonList(bucketKeyName);
+ List<RowData> joinKeyRows =
+ generateJoinKeyRows(
+ bucketKeyRows, bucketKeyRow ->
GenericRowData.of(bucketKeyRow.get(0)));
+ BucketIdExtractor bucketIdExtractor =
+ new BucketIdExtractor(BUCKET_NUMBER, table.schema(),
joinKeyNames, joinKeyNames);
+ checkCorrectness(bucketIdExtractor, bucketId, joinKeyRows);
+ }
+
+ @Test
+ public void testJoinKeysContainSingleKey() throws Exception {
+ Random seed = new Random();
+ List<List<Integer>> joinKeyIndexes = getColumnIndexCombinations();
+ int bucketId = seed.nextInt(COL_NUMBER);
+ List<Integer> joinKeyIndex =
joinKeyIndexes.get(seed.nextInt(joinKeyIndexes.size()));
+ int bucketKeyIndex = joinKeyIndex.get(1);
+ String bucketKeyName = "col" + bucketKeyIndex;
+ FileStoreTable table = createTestTable(bucketKeyName);
+ List<List<Object>> bucketKeyRows =
+ getGroundTruthBucketKeyRows(
+ table,
+ bucketId,
+
createBucketKeyGetter(Collections.singletonList(bucketKeyIndex - 1)));
+ List<String> bucketKeyNames = Collections.singletonList(bucketKeyName);
+ List<String> joinKeyNames = Arrays.asList("col" + joinKeyIndex.get(0),
bucketKeyName);
+ List<RowData> joinKeyRows =
+ generateJoinKeyRows(
+ bucketKeyRows,
+ bucketKeyRow ->
+ GenericRowData.of(
+
generateFakeColumnValue(joinKeyIndex.get(0)),
+ bucketKeyRow.get(0)));
+ BucketIdExtractor bucketIdExtractor =
+ new BucketIdExtractor(BUCKET_NUMBER, table.schema(),
joinKeyNames, bucketKeyNames);
+ checkCorrectness(bucketIdExtractor, bucketId, joinKeyRows);
+ }
+
+ @Test
+ public void testJoinKeysEqualToMultiBucketKeys() throws Exception {
+ Random seed = new Random();
+ List<List<Integer>> bucketKeyIndexes = getColumnIndexCombinations();
+ int bucketId = seed.nextInt(COL_NUMBER);
+ List<Integer> bucketKeyIndex =
bucketKeyIndexes.get(seed.nextInt(bucketKeyIndexes.size()));
+ String bucketKeyName = "col" + bucketKeyIndex.get(0) + ",col" +
bucketKeyIndex.get(1);
+ FileStoreTable table = createTestTable(bucketKeyName);
+ List<List<Object>> bucketKeyRows =
+ getGroundTruthBucketKeyRows(
+ table,
+ bucketId,
+ createBucketKeyGetter(
+ Arrays.asList(
+ bucketKeyIndex.get(0) - 1,
bucketKeyIndex.get(1) - 1)));
+ List<String> joinKeyNames = Arrays.asList(bucketKeyName.split(","));
+ List<RowData> joinKeyRows =
+ generateJoinKeyRows(
+ bucketKeyRows,
+ bucketKeyRow ->
+ GenericRowData.of(bucketKeyRow.get(0),
bucketKeyRow.get(1)));
+ BucketIdExtractor bucketIdExtractor =
+ new BucketIdExtractor(BUCKET_NUMBER, table.schema(),
joinKeyNames, joinKeyNames);
+ checkCorrectness(bucketIdExtractor, bucketId, joinKeyRows);
+ }
+
+ private List<RowData> generateJoinKeyRows(
+ List<List<Object>> bucketKeyRows, Function<List<Object>, RowData>
converter) {
+ List<RowData> joinKeyRows = new ArrayList<>();
+ for (List<Object> bucketKeyRow : bucketKeyRows) {
+ joinKeyRows.add(converter.apply(bucketKeyRow));
+ }
+ return joinKeyRows;
+ }
+
+ private void checkCorrectness(
+ BucketIdExtractor extractor, int targetBucketId, List<RowData>
joinKeyRows) {
+ for (RowData joinKeyRow : joinKeyRows) {
+
assertThat(extractor.extractBucketId(joinKeyRow)).isEqualTo(targetBucketId);
+ }
+ }
+
+ private List<List<Object>> getGroundTruthBucketKeyRows(
+ FileStoreTable table, int bucketId, Function<InternalRow,
List<Object>> bucketKeyGetter)
+ throws IOException {
+ List<ManifestEntry> files =
table.store().newScan().withBucket(bucketId).plan().files();
+ List<List<Object>> bucketKeyRows = new ArrayList<>();
+ for (ManifestEntry file : files) {
+ DataSplit dataSplit =
+ DataSplit.builder()
+ .withPartition(file.partition())
+ .withBucket(file.bucket())
+
.withDataFiles(Collections.singletonList(file.file()))
+ .withBucketPath("not used")
+ .build();
+ table.newReadBuilder()
+ .newRead()
+ .createReader(dataSplit)
+ .forEachRemaining(
+ internalRow -> {
+
bucketKeyRows.add(bucketKeyGetter.apply(internalRow));
+ });
+ }
+ return bucketKeyRows;
+ }
+
+ private FileStoreTable createTestTable(String bucketKey) throws Exception {
+ String tableName = "Test";
+ String ddl =
+ String.format(
+ "CREATE TABLE %s (col1 INT, col2 STRING, col3 FLOAT,
col4 INT, col5 BOOLEAN ) WITH"
+ + " ('bucket'='%s'",
+ tableName, BUCKET_NUMBER);
+ if (bucketKey != null) {
+ ddl += ", 'bucket-key' = '" + bucketKey + "')";
+ }
+ batchSql(ddl);
+ Random seed = new Random();
+ StringBuilder dml = new StringBuilder(String.format("INSERT INTO %s
VALUES ", tableName));
+ for (int index = 1; index < ROW_NUMBER; ++index) {
+ dml.append(
+ String.format(
+ "(%s, '%s', %s, %s, %s), ",
+ seed.nextInt(ROW_NUMBER),
+ seed.nextInt(ROW_NUMBER),
+ 101.1F + seed.nextInt(50),
+ seed.nextInt(ROW_NUMBER),
+ (seed.nextBoolean() ? "true" : "false")));
+ }
+ dml.append(
+ String.format(
+ "(%s, '%s', %s, %s, %s)",
+ seed.nextInt(ROW_NUMBER),
+ seed.nextInt(ROW_NUMBER),
+ 101.1F + seed.nextInt(50),
+ seed.nextInt(ROW_NUMBER),
+ (seed.nextBoolean() ? "true" : "false")));
+ batchSql(dml.toString());
+ return paimonTable(tableName);
+ }
+
+ private Function<InternalRow, List<Object>> createBucketKeyGetter(
+ List<Integer> bucketKeyIndexes) {
+ return row -> {
+ List<Object> bucketKeys = new ArrayList<>();
+ for (Integer bucketKeyIndex : bucketKeyIndexes) {
+ switch (bucketKeyIndex) {
+ case 0:
+ bucketKeys.add(row.getInt(0));
+ break;
+ case 1:
+
bucketKeys.add(StringData.fromString(row.getString(1).toString()));
+ break;
+ case 2:
+ bucketKeys.add(row.getFloat(2));
+ break;
+ case 3:
+ bucketKeys.add(row.getInt(3));
+ break;
+ case 4:
+ bucketKeys.add(row.getBoolean(4));
+ break;
+ default:
+ throw new UnsupportedOperationException();
+ }
+ }
+ return bucketKeys;
+ };
+ }
+
+ private List<List<Integer>> getColumnIndexCombinations() {
+ List<List<Integer>> bucketIndexes = new ArrayList<>();
+ for (int i = 1; i <= BUCKET_NUMBER; ++i) {
+ for (int j = i + 1; j <= BUCKET_NUMBER; ++j) {
+ bucketIndexes.add(Arrays.asList(i, j));
+ }
+ }
+ return bucketIndexes;
+ }
+
+ private Object generateFakeColumnValue(Integer columnIndex) {
+ switch (columnIndex) {
+ case 0:
+ return 1;
+ case 1:
+ return StringData.fromString("Test");
+ case 2:
+ return 1.21F;
+ case 3:
+ return 10;
+ case 4:
+ return false;
+ default:
+ throw new UnsupportedOperationException();
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/BucketShufflePartitionerTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/BucketShufflePartitionerTest.java
new file mode 100644
index 0000000000..5b26aca25f
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/BucketShufflePartitionerTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup;
+
+import org.apache.paimon.flink.CatalogITCaseBase;
+import org.apache.paimon.flink.lookup.partitioner.BucketIdExtractor;
+import org.apache.paimon.flink.lookup.partitioner.BucketShufflePartitioner;
+import org.apache.paimon.flink.lookup.partitioner.BucketShuffleStrategy;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.DataSplit;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** The test for {@link BucketShufflePartitioner}. */
+public class BucketShufflePartitionerTest extends CatalogITCaseBase {
+
+ @Test
+ public void testBucketNumLessThanLookupJoinParallelism() throws Exception {
+ int numBuckets = 100;
+ int lookupJoinParallelism = 27;
+ FileStoreTable table = createTestTable(numBuckets);
+ BucketShuffleStrategy strategy = new BucketShuffleStrategy(numBuckets);
+ BucketShufflePartitioner bucketShufflePartitioner =
+ createBucketShufflePartitioner(table, numBuckets, strategy);
+ List<Tuple2<RowData, Integer>> joinKeysList =
+ getGroundTruthJoinKeysWithBucketId(table, numBuckets);
+ Map<Integer, List<Tuple2<RowData, Integer>>> partitionResult = new
HashMap<>();
+ for (Tuple2<RowData, Integer> joinKeysWithBucketId : joinKeysList) {
+ int subtaskId =
+ bucketShufflePartitioner.partition(
+ joinKeysWithBucketId.f0, lookupJoinParallelism);
+ partitionResult.compute(
+ subtaskId,
+ (key, currentValue) -> {
+ List<Tuple2<RowData, Integer>> newValue =
+ (currentValue != null) ? currentValue : new
ArrayList<>();
+ newValue.add(joinKeysWithBucketId);
+ return newValue;
+ });
+ }
+ for (int subtaskId = 0; subtaskId < lookupJoinParallelism;
++subtaskId) {
+ List<Tuple2<RowData, Integer>> joinKeysWithBucketIdList =
+ partitionResult.get(subtaskId);
+ Set<Integer> requiredCacheBucketIds =
+ strategy.getRequiredCacheBucketIds(subtaskId,
lookupJoinParallelism);
+ for (Tuple2<RowData, Integer> joinKeysWithBucketId :
joinKeysWithBucketIdList) {
+
assertThat(requiredCacheBucketIds).contains(joinKeysWithBucketId.f1);
+ assertThat(joinKeysWithBucketId.f1 %
lookupJoinParallelism).isEqualTo(subtaskId);
+ }
+ }
+ }
+
+ @Test
+ public void testBucketNumEqualToLookupJoinParallelism() throws Exception {
+ int numBuckets = 78;
+ int lookupJoinParallelism = 78;
+ FileStoreTable table = createTestTable(numBuckets);
+ BucketShuffleStrategy strategy = new BucketShuffleStrategy(numBuckets);
+ BucketShufflePartitioner bucketShufflePartitioner =
+ createBucketShufflePartitioner(table, numBuckets, strategy);
+ List<Tuple2<RowData, Integer>> joinKeysList =
+ getGroundTruthJoinKeysWithBucketId(table, numBuckets);
+ Map<Integer, List<Tuple2<RowData, Integer>>> partitionResult = new
HashMap<>();
+ for (Tuple2<RowData, Integer> joinKeysWithBucketId : joinKeysList) {
+ int subtaskId =
+ bucketShufflePartitioner.partition(
+ joinKeysWithBucketId.f0, lookupJoinParallelism);
+ partitionResult.compute(
+ subtaskId,
+ (key, currentValue) -> {
+ List<Tuple2<RowData, Integer>> newValue =
+ (currentValue != null) ? currentValue : new
ArrayList<>();
+ newValue.add(joinKeysWithBucketId);
+ return newValue;
+ });
+ }
+ for (int subtaskId = 0; subtaskId < lookupJoinParallelism;
++subtaskId) {
+ List<Tuple2<RowData, Integer>> joinKeysWithBucketIdList =
+ partitionResult.get(subtaskId);
+ Set<Integer> requiredCacheBucketIds =
+ strategy.getRequiredCacheBucketIds(subtaskId,
lookupJoinParallelism);
+ for (Tuple2<RowData, Integer> joinKeysWithBucketId :
joinKeysWithBucketIdList) {
+ assertThat(requiredCacheBucketIds.size()).isOne();
+
assertThat(requiredCacheBucketIds).contains(joinKeysWithBucketId.f1);
+ assertThat(joinKeysWithBucketId.f1 %
lookupJoinParallelism).isEqualTo(subtaskId);
+ }
+ }
+ }
+
+ @Test
+ public void testBucketNumLargerThanLookupJoinParallelism() throws
Exception {
+ int numBuckets = 4;
+ int lookupJoinParallelism = 15;
+ FileStoreTable table = createTestTable(numBuckets);
+ BucketShuffleStrategy strategy = new BucketShuffleStrategy(numBuckets);
+ BucketShufflePartitioner bucketShufflePartitioner =
+ createBucketShufflePartitioner(table, numBuckets, strategy);
+ List<Tuple2<RowData, Integer>> joinKeysList =
+ getGroundTruthJoinKeysWithBucketId(table, numBuckets);
+ Map<Integer, List<Tuple2<RowData, Integer>>> partitionResult = new
HashMap<>();
+ for (Tuple2<RowData, Integer> joinKeysWithBucketId : joinKeysList) {
+ int subtaskId =
+ bucketShufflePartitioner.partition(
+ joinKeysWithBucketId.f0, lookupJoinParallelism);
+ partitionResult.compute(
+ subtaskId,
+ (key, currentValue) -> {
+ List<Tuple2<RowData, Integer>> newValue =
+ (currentValue != null) ? currentValue : new
ArrayList<>();
+ newValue.add(joinKeysWithBucketId);
+ return newValue;
+ });
+ }
+ for (int subtaskId = 0; subtaskId < lookupJoinParallelism;
++subtaskId) {
+ List<Tuple2<RowData, Integer>> joinKeysWithBucketIdList =
+ partitionResult.get(subtaskId);
+ Set<Integer> requiredCacheBucketIds =
+ strategy.getRequiredCacheBucketIds(subtaskId,
lookupJoinParallelism);
+ for (Tuple2<RowData, Integer> joinKeysWithBucketId :
joinKeysWithBucketIdList) {
+
assertThat(requiredCacheBucketIds).contains(joinKeysWithBucketId.f1);
+ assertThat(subtaskId %
numBuckets).isEqualTo(joinKeysWithBucketId.f1);
+ }
+ }
+ }
+
+ private List<Tuple2<RowData, Integer>> getGroundTruthJoinKeysWithBucketId(
+ FileStoreTable table, int numBuckets) throws IOException {
+ List<Tuple2<RowData, Integer>> joinKeyRows = new ArrayList<>();
+ Random random = new Random();
+ for (int bucketId = 0; bucketId < numBuckets; ++bucketId) {
+ ManifestEntry file =
table.store().newScan().withBucket(bucketId).plan().files().get(0);
+ DataSplit dataSplit =
+ DataSplit.builder()
+ .withPartition(file.partition())
+ .withBucket(file.bucket())
+
.withDataFiles(Collections.singletonList(file.file()))
+ .withBucketPath("not used")
+ .build();
+ int bucket = bucketId;
+ table.newReadBuilder()
+ .newRead()
+ .createReader(dataSplit)
+ .forEachRemaining(
+ internalRow -> {
+ joinKeyRows.add(
+ Tuple2.of(
+ GenericRowData.of(
+
String.valueOf(random.nextInt(numBuckets)),
+ internalRow.getInt(1)),
+ bucket));
+ });
+ }
+ return joinKeyRows;
+ }
+
+ private BucketShufflePartitioner createBucketShufflePartitioner(
+ FileStoreTable table, int numBuckets, BucketShuffleStrategy
strategy) {
+ BucketIdExtractor bucketIdExtractor =
+ new BucketIdExtractor(
+ numBuckets,
+ table.schema(),
+ Arrays.asList("col1", "col2"),
+ Collections.singletonList("col2"));
+ return new BucketShufflePartitioner(strategy, bucketIdExtractor);
+ }
+
+ private FileStoreTable createTestTable(int bucketNum) throws Exception {
+ String tableName = "Test";
+ String ddl =
+ String.format(
+ "CREATE TABLE %s (col1 STRING, col2 INT, col3 FLOAT)
WITH"
+ + " ('bucket'='%s', 'bucket-key' = 'col2')",
+ tableName, bucketNum);
+ batchSql(ddl);
+ StringBuilder dml = new StringBuilder(String.format("INSERT INTO %s
VALUES ", tableName));
+ for (int index = 1; index < 1000; ++index) {
+ dml.append(String.format("('%s', %s, %s), ", index, index,
101.1F));
+ }
+ dml.append(String.format("('%s', %s, %s)", 1000, 1000, 101.1F));
+ batchSql(dml.toString());
+ return paimonTable(tableName);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/BucketShuffleStrategyTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/BucketShuffleStrategyTest.java
new file mode 100644
index 0000000000..efe9694359
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/BucketShuffleStrategyTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup;
+
+import org.apache.paimon.flink.lookup.partitioner.BucketShuffleStrategy;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.catchThrowable;
+
+/** The test for {@link BucketShuffleStrategy}. */
+public class BucketShuffleStrategyTest {
+
+ @Test
+ public void testConstructorWithInvalidBuckets() {
+ Throwable thrown = catchThrowable(() -> new BucketShuffleStrategy(0));
+ assertThat(thrown)
+ .isInstanceOf(IllegalStateException.class)
+ .hasMessage("Number of buckets should be positive.");
+ }
+
+ @Test
+ public void testGetTargetSubtaskIdWithMoreBucketsThanSubtasks() {
+ BucketShuffleStrategy bucketShuffleStrategy = new
BucketShuffleStrategy(10);
+ int targetSubtaskId = bucketShuffleStrategy.getTargetSubtaskId(5,
12345, 4);
+ assertThat(targetSubtaskId).isEqualTo(1);
+ }
+
+ @Test
+ public void testGetTargetSubtaskIdWithLessBucketsThanSubtasks() {
+ BucketShuffleStrategy bucketShuffleStrategy1 = new
BucketShuffleStrategy(3);
+ int targetSubtaskId1 = bucketShuffleStrategy1.getTargetSubtaskId(2,
12345, 10);
+ int expectedTargetSubtaskId1 = ((12345 % 3) * 3 + 2);
+ assertThat(targetSubtaskId1).isEqualTo(expectedTargetSubtaskId1);
+ BucketShuffleStrategy bucketShuffleStrategy2 = new
BucketShuffleStrategy(2);
+ int targetSubtaskId2 = bucketShuffleStrategy2.getTargetSubtaskId(0,
54321, 7);
+ int expectedTargetSubtaskId2 = (54321 % 4) * 2;
+ assertThat(targetSubtaskId2).isEqualTo(expectedTargetSubtaskId2);
+ }
+
+ @Test
+ public void testGetRequiredCacheBucketIdsWithMoreBucketsThanSubtasks() {
+ BucketShuffleStrategy bucketShuffleStrategy = new
BucketShuffleStrategy(10);
+ Set<Integer> bucketIds =
bucketShuffleStrategy.getRequiredCacheBucketIds(1, 4);
+ // Subtask ID 1 should cache all buckets where bucketId % 4 == 1
+ assertThat(bucketIds).containsExactlyInAnyOrder(1, 5, 9);
+ }
+
+ @Test
+ public void testGetRequiredCacheBucketIdsWithLessBucketsThanSubtasks() {
+ BucketShuffleStrategy bucketShuffleStrategy = new
BucketShuffleStrategy(3);
+ Set<Integer> bucketIds =
bucketShuffleStrategy.getRequiredCacheBucketIds(1, 10);
+ // Subtask ID 1 should cache the bucket where bucketId == 1 % 3
+ assertThat(bucketIds).containsExactlyInAnyOrder(1);
+ }
+
+ @Test
+ public void testSymmetryOfGetTargetSubtaskIdAndGetRequiredCacheBucketIds()
{
+ for (int numBuckets = 1; numBuckets < 100; ++numBuckets) {
+ for (int numSubtasks = 1; numSubtasks < 100; ++numSubtasks) {
+ testCorrectness(numBuckets, numSubtasks);
+ }
+ }
+ }
+
+ private void testCorrectness(int numBuckets, int numSubtasks) {
+ BucketShuffleStrategy bucketShuffleStrategy = new
BucketShuffleStrategy(numBuckets);
+ Set<Integer> allTargetSubtaskIds = new HashSet<>();
+ for (int bucketId = 0; bucketId < numBuckets; bucketId++) {
+ for (int joinKeyHash = 0; joinKeyHash < numSubtasks;
++joinKeyHash) {
+ int subtaskId =
+ bucketShuffleStrategy.getTargetSubtaskId(
+ bucketId, joinKeyHash, numSubtasks);
+ assertThat(subtaskId).isNotNegative().isLessThan(numSubtasks);
+ allTargetSubtaskIds.add(subtaskId);
+ Set<Integer> requiredCacheBucketIds =
+
bucketShuffleStrategy.getRequiredCacheBucketIds(subtaskId, numSubtasks);
+ assertThat(requiredCacheBucketIds).contains(bucketId);
+ }
+ }
+ assertThat(allTargetSubtaskIds).hasSize(numSubtasks);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
index af5dec0f87..dbf9f99a32 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/FileStoreLookupFunctionTest.java
@@ -105,7 +105,11 @@ public class FileStoreLookupFunctionTest {
private FileStoreLookupFunction createLookupFunction(
FileStoreTable table, boolean joinEqualPk) {
return new FileStoreLookupFunction(
- table, new int[] {0, 1}, joinEqualPk ? new int[] {0, 1} : new
int[] {1}, null);
+ table,
+ new int[] {0, 1},
+ joinEqualPk ? new int[] {0, 1} : new int[] {1},
+ null,
+ null);
}
private FileStoreTable createFileStoreTable(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupJoinBucketShuffleITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupJoinBucketShuffleITCase.java
new file mode 100644
index 0000000000..a4e485d829
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupJoinBucketShuffleITCase.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.lookup;
+
+import org.apache.paimon.flink.CatalogITCaseBase;
+
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** ITCase for custom lookup shuffle. */
+public class LookupJoinBucketShuffleITCase extends CatalogITCaseBase {
+
+ private static final int BUCKET_NUMBER = 5;
+
+ private static final int ROW_NUMBER = 100;
+
+ /**
+ * Test the non-pk full cached table where join key count is 1 and join
key equals the bucket
+ * key.
+ */
+ @Test
+ public void testBucketShuffleForNonPrimaryTableInFullCacheModeCase1()
throws Exception {
+ String nonPrimaryKeyDimTable = createNonPrimaryKeyDimTable("col1");
+ String query =
+ ("SELECT /*+ LOOKUP('table'='D', 'shuffle'='true') */ T.col1,
D.col2 FROM T JOIN DIM "
+ + "for system_time as of T.proc_time AS D ON
T.col1 = D.col1")
+ .replace("DIM", nonPrimaryKeyDimTable);
+ testBucketNumberCases(query);
+ }
+
+ /**
+ * Test the non-pk full cached table where join key count is 2, join keys
equal the bucket keys.
+ */
+ @Test
+ public void testBucketShuffleForNonPrimaryTableInFullCacheModeCase2()
throws Exception {
+ String nonPrimaryKeyDimTable =
createNonPrimaryKeyDimTable("col1,col2");
+ String query =
+ ("SELECT /*+ LOOKUP('table'='D', 'shuffle'='true') */ T.col1,
D.col2 FROM T JOIN DIM "
+ + "for system_time as of T.proc_time AS D ON
T.col1 = D.col1 AND T.col2 = D.col2")
+ .replace("DIM", nonPrimaryKeyDimTable);
+ testBucketNumberCases(query);
+ }
+
+ /**
+ * Test the non-pk full cached table where join key count is 2, bucket key
is one of join key.
+ */
+ @Test
+ public void testBucketShuffleForNonPrimaryTableInFullCacheModeCase3()
throws Exception {
+ String nonPrimaryKeyDimTable = createNonPrimaryKeyDimTable("col2");
+ String query =
+ ("SELECT /*+ LOOKUP('table'='D', 'shuffle'='true') */ T.col1,
D.col2 FROM T JOIN DIM "
+ + "for system_time as of T.proc_time AS D ON
T.col1 = D.col1 AND T.col2 = D.col2")
+ .replace("DIM", nonPrimaryKeyDimTable);
+ testBucketNumberCases(query);
+ }
+
+ /**
+ * Test the non-pk full cached table where join keys contain constant
keys, bucket key is one of
+ * join key.
+ */
+ @Test
+ public void testBucketShuffleForNonPrimaryTableInFullCacheModeCase4()
throws Exception {
+ String nonPrimaryKeyDimTable = createNonPrimaryKeyDimTable("col2");
+ String query =
+ ("SELECT /*+ LOOKUP('table'='D', 'shuffle'='true') */ T.col1,
D.col2 FROM T JOIN DIM "
+ + "for system_time as of T.proc_time AS D ON
T.col1 = D.col1 AND T.col2 = D.col2 AND "
+ + "D.col4 = CAST('2024-06-09' AS DATE) AND
D.col5 = 123.45 ")
+ .replace("DIM", nonPrimaryKeyDimTable);
+ testBucketNumberCases(query);
+ }
+
+ /** Test the pk full cached table where join key count is 1, join key
equals the bucket key. */
+ @Test
+ public void testBucketShuffleForPrimaryTableInFullCacheModeCase1() throws
Exception {
+ String primaryKeyDimTable = createPrimaryKeyDimTable(1, true, "col1");
+ String query =
+ ("SELECT /*+ LOOKUP('table'='D', 'shuffle'='true') */ T.col1,
D.col2 FROM T JOIN DIM "
+ + "for system_time as of T.proc_time AS D ON
T.col1 = D.col1")
+ .replace("DIM", primaryKeyDimTable);
+ testBucketNumberCases(query);
+ }
+
+ /** Test the pk full cached table where join key count is 2, join keys
equal the bucket keys. */
+ @Test
+ public void testBucketShuffleForPrimaryTableInFullCacheModeCase2() throws
Exception {
+ String primaryKeyDimTable = createPrimaryKeyDimTable(2, true,
"col1,col2");
+ String query =
+ ("SELECT /*+ LOOKUP('table'='D', 'shuffle'='true') */ T.col1,
D.col2 FROM T JOIN DIM "
+ + "for system_time as of T.proc_time AS D ON
T.col1 = D.col1 AND T.col2 = D.col2")
+ .replace("DIM", primaryKeyDimTable);
+ testBucketNumberCases(query);
+ }
+
+ /** Test the pk full cached table where join key count is 2, bucket key is
one of join key. */
+ @Test
+ public void testBucketShuffleForPrimaryTableInFullCacheModeCase3() throws
Exception {
+ String primaryKeyDimTable = createPrimaryKeyDimTable(2, true, "col2");
+ String query =
+ ("SELECT /*+ LOOKUP('table'='D', 'shuffle'='true') */ T.col1,
D.col2 FROM T JOIN DIM "
+ + "for system_time as of T.proc_time AS D ON
T.col1 = D.col1 AND T.col2 = D.col2")
+ .replace("DIM", primaryKeyDimTable);
+ testBucketNumberCases(query);
+ }
+
+ /**
+ * Test the pk full cached table where join keys contain constant keys,
bucket key is one of
+ * join key.
+ */
+ @Test
+ public void testBucketShuffleForPrimaryTableInFullCacheModeCase4() throws
Exception {
+ String primaryKeyDimTable = createPrimaryKeyDimTable(2, true, "col2");
+ String query =
+ ("SELECT /*+ LOOKUP('table'='D', 'shuffle'='true') */ T.col1,
D.col2 FROM T JOIN DIM "
+ + "for system_time as of T.proc_time AS D ON
T.col1 = D.col1 AND T.col2 = D.col2 AND "
+ + "D.col4 = CAST('2024-06-09' AS DATE) AND
D.col5 = 123.45 ")
+ .replace("DIM", primaryKeyDimTable);
+ testBucketNumberCases(query);
+ }
+
+ /** Test the pk auto cached table where join key count is 1, join key
equals the bucket key. */
+ @Test
+ public void testBucketShuffleForPrimaryTableInAutoCacheModeCase1() throws
Exception {
+ String primaryKeyDimTable = createPrimaryKeyDimTable(1, false, "col1");
+ String query =
+ ("SELECT /*+ LOOKUP('table'='D', 'shuffle'='true') */ T.col1,
D.col2 FROM T JOIN DIM "
+ + "for system_time as of T.proc_time AS D ON
T.col1 = D.col1")
+ .replace("DIM", primaryKeyDimTable);
+ testBucketNumberCases(query);
+ }
+
+ /** Test the pk auto cached table where join key count is 2, join keys
equal the bucket keys. */
+ @Test
+ public void testBucketShuffleForPrimaryTableInAutoCacheModeCase2() throws
Exception {
+ String primaryKeyDimTable = createPrimaryKeyDimTable(2, false,
"col1,col2");
+ String query =
+ ("SELECT /*+ LOOKUP('table'='D', 'shuffle'='true') */ T.col1,
D.col2 FROM T JOIN DIM "
+ + "for system_time as of T.proc_time AS D ON
T.col1 = D.col1 "
+ + "AND T.col2 = D.col2")
+ .replace("DIM", primaryKeyDimTable);
+ testBucketNumberCases(query);
+ }
+
+ /** Test the pk auto cached table where join key count is 2, bucket key is
one of join key. */
+ @Test
+ public void testBucketShuffleForPrimaryTableInAutoCacheModeCase3() throws
Exception {
+ String primaryKeyDimTable = createPrimaryKeyDimTable(2, false, "col2");
+ String query =
+ ("SELECT /*+ LOOKUP('table'='D', 'shuffle'='true') */ T.col1,
D.col2 FROM T JOIN DIM "
+ + "for system_time as of T.proc_time AS D ON
T.col1 = D.col1 "
+ + "AND T.col2 = D.col2")
+ .replace("DIM", primaryKeyDimTable);
+ testBucketNumberCases(query);
+ }
+
+ /**
+ * Test the pk auto cached table where join keys contain constant keys,
bucket key is one of
+ * join key.
+ */
+ @Test
+ public void testBucketShuffleForPrimaryTableInAutoCacheModeCase4() throws
Exception {
+ String primaryKeyDimTable = createPrimaryKeyDimTable(2, false, "col2");
+ String query =
+ ("SELECT /*+ LOOKUP('table'='D', 'shuffle'='true') */ T.col1,
D.col2 FROM T JOIN DIM "
+ + "for system_time as of T.proc_time AS D ON
T.col1 = D.col1 AND T.col2 = D.col2 AND "
+ + "D.col4 = CAST('2024-06-09' AS DATE) AND
D.col5 = 123.45")
+ .replace("DIM", primaryKeyDimTable);
+ testBucketNumberCases(query);
+ }
+
+ private void testBucketNumberCases(String query) throws Exception {
+ List<Row> groundTruthRows = getGroundTruthRows();
+ // Test the case that bucket number = lookup parallelism.
+ sEnv.getConfig()
+
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
BUCKET_NUMBER);
+ List<Row> result1 = streamSqlBlockIter(query).collect(ROW_NUMBER);
+
assertThat(result1).containsExactlyInAnyOrderElementsOf(groundTruthRows);
+ // Test the case that bucket number > lookup parallelism.
+
sEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
3);
+ List<Row> result2 = streamSqlBlockIter(query).collect(ROW_NUMBER);
+
assertThat(result2).containsExactlyInAnyOrderElementsOf(groundTruthRows);
+ // Test the case that bucket number < lookup parallelism.
+
sEnv.getConfig().set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
8);
+ List<Row> result3 = streamSqlBlockIter(query).collect(ROW_NUMBER);
+
assertThat(result3).containsExactlyInAnyOrderElementsOf(groundTruthRows);
+ }
+
+ private String createPrimaryKeyDimTable(
+ int primaryKeyNumber, boolean fullCache, String bucketKey) {
+ createSourceTable();
+ String tableName = "DIM_PRIMARY";
+ if (fullCache) {
+ tableName += "_full_cache";
+ } else {
+ tableName += "_auto";
+ }
+ if (bucketKey != null) {
+ tableName += "_" + bucketKey.split(",").length;
+ }
+ String ddl;
+ if (primaryKeyNumber == 1) {
+ tableName += "_1";
+ ddl =
+ String.format(
+ "CREATE TABLE %s (col1 INT, col2 STRING, col3 INT,
col4 DATE, col5 DECIMAL(10, 2), "
+ + "PRIMARY KEY (col1) NOT ENFORCED) WITH"
+ + " ('continuous.discovery-interval'='1
ms', 'bucket'='%s'",
+ tableName, BUCKET_NUMBER);
+ } else {
+ tableName += "_2";
+ ddl =
+ String.format(
+ "CREATE TABLE %s (col1 INT, col2 STRING, col3 INT,
col4 DATE, col5 DECIMAL(10, 2), "
+ + "PRIMARY KEY (col1, col2) NOT ENFORCED)
WITH"
+ + " ('continuous.discovery-interval'='1
ms', 'bucket'='%s'",
+ tableName, BUCKET_NUMBER);
+ }
+ if (bucketKey != null) {
+ ddl += ", 'bucket-key' = '" + bucketKey + "'";
+ }
+ if (fullCache) {
+ ddl += " ,'lookup.cache' = 'full')";
+ } else {
+ ddl += " )";
+ }
+ batchSql(ddl);
+ StringBuilder dml = new StringBuilder(String.format("INSERT INTO %s
VALUES ", tableName));
+ for (int index = 1; index < ROW_NUMBER; ++index) {
+ dml.append(
+ String.format(
+ "(%s, '%s', %s, CAST('2024-06-09' AS DATE),
123.45), ",
+ index, index * 10, index * 10));
+ }
+ dml.append(
+ String.format(
+ "(%s, '%s', %s, CAST('2024-06-09' AS DATE), 123.45)",
+ ROW_NUMBER, ROW_NUMBER * 10, ROW_NUMBER * 10));
+ batchSql(dml.toString());
+ return tableName;
+ }
+
+ private String createNonPrimaryKeyDimTable(String bucketKey) {
+ createSourceTable();
+ String tableName = "DIM";
+ if (bucketKey != null) {
+ tableName += "_" + bucketKey.split(",").length;
+ }
+ String ddl =
+ String.format(
+ "CREATE TABLE %s (col1 INT, col2 STRING, col3 INT,
col4 DATE,"
+ + " col5 DECIMAL(10, 2)) WITH"
+ + " ('continuous.discovery-interval'='1 ms',
'bucket'='%s'",
+ tableName, BUCKET_NUMBER);
+ if (bucketKey != null) {
+ ddl += ", 'bucket-key' = '" + bucketKey + "')";
+ }
+ batchSql(ddl);
+ StringBuilder dml = new StringBuilder(String.format("INSERT INTO %s
VALUES ", tableName));
+ for (int index = 1; index < ROW_NUMBER; ++index) {
+ dml.append(
+ String.format(
+ "(%s, '%s', %s, CAST('2024-06-09' AS DATE),
123.45), ",
+ index, index * 10, index * 10));
+ }
+ dml.append(
+ String.format(
+ "(%s, '%s', %s, CAST('2024-06-09' AS DATE), 123.45)",
+ ROW_NUMBER, ROW_NUMBER * 10, ROW_NUMBER * 10));
+ batchSql(dml.toString());
+ return tableName;
+ }
+
+ private void createSourceTable() {
+ String ddl = "CREATE TABLE T (col1 INT, col2 STRING, col3 INT,
`proc_time` AS PROCTIME())";
+ batchSql(ddl);
+ StringBuilder dml = new StringBuilder("INSERT INTO T VALUES ");
+ for (int index = 1; index < ROW_NUMBER; ++index) {
+ dml.append(String.format("(%s, '%s', %s), ", index, index * 10,
index * 10));
+ }
+ dml.append(String.format("(%s, '%s', %s)", ROW_NUMBER, ROW_NUMBER *
10, ROW_NUMBER * 10));
+ batchSql(dml.toString());
+ }
+
+ private List<Row> getGroundTruthRows() {
+ List<Row> results = new ArrayList<>();
+ for (int index = 1; index <= ROW_NUMBER; ++index) {
+ results.add(Row.of(index, String.valueOf(index * 10)));
+ }
+ return results;
+ }
+}
diff --git a/paimon-flink/paimon-flink1-common/pom.xml
b/paimon-flink/paimon-flink1-common/pom.xml
index d580b5dcda..1cfca5226e 100644
--- a/paimon-flink/paimon-flink1-common/pom.xml
+++ b/paimon-flink/paimon-flink1-common/pom.xml
@@ -49,6 +49,12 @@ under the License.
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsLookupCustomShuffle.java
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsLookupCustomShuffle.java
new file mode 100644
index 0000000000..6e31c1b800
--- /dev/null
+++
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/flink/table/connector/source/abilities/SupportsLookupCustomShuffle.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.connector.source.abilities;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.Partitioner;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.data.RowData;
+
+import java.io.Serializable;
+import java.util.Optional;
+
+/**
+ * This interface is designed to allow connectors to provide a custom
partitioning strategy for the
+ * data that is fed into the {@link LookupTableSource}. This enables the Flink
Planner to optimize
+ * the distribution of input stream across different subtasks of lookup-join
node to match the
+ * distribution of data in the external data source.
+ */
+@PublicEvolving
+public interface SupportsLookupCustomShuffle {
+ /**
+ * This method is used to retrieve a custom partitioner that will be
applied to the input stream
+ * of lookup-join node.
+ *
+ * @return An {@link InputDataPartitioner} that defines how records should
be distributed across
+ * the different subtasks. If the connector expects the input data to
remain in its original
+ * distribution, an {@link Optional#empty()} should be returned.
+ */
+ Optional<InputDataPartitioner> getPartitioner();
+
+ /**
+ * This interface is responsible for providing custom partitioning logic
for the RowData
+ * records. We didn't use {@link Partitioner} directly because the input
data is always RowData
+ * type, and we need to extract all join keys from the input data before
send it to partitioner.
+ */
+ @PublicEvolving
+ interface InputDataPartitioner extends Serializable {
+ /**
+ * Determining the partition id for each input data.
+ *
+ * <p>This data is projected to only including all join keys before
emit to this
+ * partitioner.
+ *
+ * @param joinKeys The extracted join key for each input record.
+ * @param numPartitions The total number of partition.
+ * @return An integer representing the partition id to which the
record should be sent.
+ */
+ int partition(RowData joinKeys, int numPartitions);
+
+ /**
+ * Returns information about the determinism of this partitioner.
+ *
+ * <p>It returns true if and only if a call to the {@link
#partition(RowData, int)} method
+ * is guaranteed to always return the same result given the same
joinKeyRow. If the
+ * partitioning logic depends on not purely functional like <code>
+ * random(), date(), now(), ...</code> this method must return false.
+ *
+ * <p>If this method return false, planner may not apply this
partitioner in upsert mode to
+ * avoid out-of-order of the changelog events.
+ */
+ default boolean isDeterministic() {
+ return true;
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
similarity index 71%
rename from
paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
rename to
paimon-flink/paimon-flink1-common/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
index 460fea55ad..1087dcb65c 100644
---
a/paimon-flink/paimon-flink-1.18/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
+++
b/paimon-flink/paimon-flink1-common/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
@@ -19,6 +19,10 @@
package org.apache.paimon.flink.utils;
import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.functions.FunctionContext;
+
+import javax.annotation.Nullable;
/** Utility methods about Flink runtime context to resolve compatibility
issues. */
public class RuntimeContextUtils {
@@ -29,4 +33,16 @@ public class RuntimeContextUtils {
public static int getIndexOfThisSubtask(RuntimeContext context) {
return context.getIndexOfThisSubtask();
}
+
+ public static @Nullable Integer
getNumberOfParallelSubtasks(FunctionContext context) {
+ return null;
+ }
+
+ public static @Nullable Integer getIndexOfThisSubtask(FunctionContext
context) {
+ return null;
+ }
+
+ public static boolean preferCustomShuffle(LookupTableSource.LookupContext
context) {
+ return false;
+ }
}
diff --git a/paimon-flink/paimon-flink2-common/pom.xml
b/paimon-flink/paimon-flink2-common/pom.xml
index 2a5271f78b..845534c71d 100644
--- a/paimon-flink/paimon-flink2-common/pom.xml
+++ b/paimon-flink/paimon-flink2-common/pom.xml
@@ -49,6 +49,12 @@ under the License.
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-table-common</artifactId>
+ <version>${flink.version}</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
b/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
similarity index 68%
rename from
paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
rename to
paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
index 34e0d041b6..ff5fa86812 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
+++
b/paimon-flink/paimon-flink2-common/src/main/java/org/apache/paimon/flink/utils/RuntimeContextUtils.java
@@ -19,6 +19,10 @@
package org.apache.paimon.flink.utils;
import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.table.connector.source.LookupTableSource;
+import org.apache.flink.table.functions.FunctionContext;
+
+import javax.annotation.Nullable;
/** Utility methods about Flink runtime context to resolve compatibility
issues. */
public class RuntimeContextUtils {
@@ -29,4 +33,16 @@ public class RuntimeContextUtils {
public static int getIndexOfThisSubtask(RuntimeContext context) {
return context.getTaskInfo().getIndexOfThisSubtask();
}
+
+ public static @Nullable Integer
getNumberOfParallelSubtasks(FunctionContext context) {
+ return context.getTaskInfo().getNumberOfParallelSubtasks();
+ }
+
+ public static @Nullable Integer getIndexOfThisSubtask(FunctionContext
context) {
+ return context.getTaskInfo().getIndexOfThisSubtask();
+ }
+
+ public static boolean preferCustomShuffle(LookupTableSource.LookupContext
context) {
+ return context.preferCustomShuffle();
+ }
}