This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new f067979ef [flink] Balance MonitorFunction data shuffle for partitioned 
table (#3180)
f067979ef is described below

commit f067979ef6da3362923579838901e31e78818221
Author: 吴祥平 <[email protected]>
AuthorDate: Tue Apr 9 18:45:31 2024 +0800

    [flink] Balance MonitorFunction data shuffle for partitioned table (#3180)
---
 .../org/apache/paimon/flink/source/operator/MonitorFunction.java | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java
index 30ac622ce..254e3acd2 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MonitorFunction.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink.source.operator;
 
 import org.apache.paimon.flink.utils.JavaTypeInfo;
+import org.apache.paimon.table.sink.ChannelComputer;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.EndOfScanException;
 import org.apache.paimon.table.source.ReadBuilder;
@@ -236,8 +237,12 @@ public class MonitorFunction extends 
RichSourceFunction<Split>
                         new JavaTypeInfo<>(Split.class))
                 .forceNonParallel()
                 .partitionCustom(
-                        (key, numPartitions) -> key % numPartitions,
-                        split -> ((DataSplit) split).bucket())
+                        (key, numPartitions) ->
+                                ChannelComputer.select(key.f0, key.f1, 
numPartitions),
+                        split -> {
+                            DataSplit dataSplit = (DataSplit) split;
+                            return Tuple2.of(dataSplit.partition(), 
dataSplit.bucket());
+                        })
                 .transform(name + "-Reader", typeInfo, new 
ReadOperator(readBuilder));
     }
 }

Reply via email to