This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new a1cfab9ef [FLINK-38833][paimon] Shuffle record to different subtasks 
by table, partition and bucket.  (#4298)
a1cfab9ef is described below

commit a1cfab9ef6e23017f06e29d07be9aa217f71ea1c
Author: Pei Yu <[email protected]>
AuthorDate: Wed Mar 4 17:32:56 2026 +0800

    [FLINK-38833][paimon] Shuffle record to different subtasks by table, 
partition and bucket.  (#4298)
    
    Signed-off-by: Pei Yu <[email protected]>
---
 .../cdc/connectors/paimon/sink/v2/PaimonEventSink.java   |  1 +
 .../paimon/sink/v2/bucket/BucketAssignOperator.java      |  9 ++++++++-
 .../paimon/sink/v2/bucket/BucketWrapperChangeEvent.java  | 16 +++++++++++++---
 .../sink/v2/bucket/BucketWrapperEventSerializer.java     |  5 ++++-
 4 files changed, 26 insertions(+), 5 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEventSink.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEventSink.java
index eb2af9f43..9487a3cc2 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEventSink.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEventSink.java
@@ -69,6 +69,7 @@ public class PaimonEventSink extends PaimonSink<Event> 
implements WithPreWriteTo
                             if (event instanceof BucketWrapperChangeEvent) {
                                 // Add hash of tableId to avoid data skew.
                                 return ((BucketWrapperChangeEvent) 
event).getBucket()
+                                        + ((BucketWrapperChangeEvent) 
event).getPartition()
                                         + ((BucketWrapperChangeEvent) 
event).tableId().hashCode();
                             } else {
                                 return ((BucketWrapper) event).getBucket();
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java
index 1e2dd65a1..faa832fc7 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java
@@ -157,6 +157,7 @@ public class BucketAssignOperator extends 
AbstractStreamOperator<Event>
                     bucketAssignerMap.computeIfAbsent(
                             dataChangeEvent.tableId(), this::getTableInfo);
             int bucket;
+            int partition;
             GenericRow genericRow =
                     PaimonWriterHelper.convertEventToGenericRow(
                             dataChangeEvent,
@@ -171,17 +172,20 @@ public class BucketAssignOperator extends 
AbstractStreamOperator<Event>
                                 tuple4.f2.assign(
                                         tuple4.f3.partition(genericRow),
                                         
tuple4.f3.trimmedPrimaryKey(genericRow).hashCode());
+                        partition = tuple4.f3.partition(genericRow).hashCode();
                         break;
                     }
                 case HASH_FIXED:
                     {
                         tuple4.f1.setRecord(genericRow);
                         bucket = tuple4.f1.bucket();
+                        partition = tuple4.f1.partition().hashCode();
                         break;
                     }
                 case BUCKET_UNAWARE:
                     {
                         bucket = 0;
+                        partition = 0;
                         break;
                     }
                 case KEY_DYNAMIC:
@@ -191,7 +195,8 @@ public class BucketAssignOperator extends 
AbstractStreamOperator<Event>
                     }
             }
             output.collect(
-                    new StreamRecord<>(new BucketWrapperChangeEvent(bucket, 
dataChangeEvent)));
+                    new StreamRecord<>(
+                            new BucketWrapperChangeEvent(bucket, partition, 
dataChangeEvent)));
         } else {
             // Broadcast SchemachangeEvent.
             for (int index = 0; index < totalTasksNumber; index++) {
@@ -199,6 +204,7 @@ public class BucketAssignOperator extends 
AbstractStreamOperator<Event>
                         new StreamRecord<>(
                                 new BucketWrapperChangeEvent(
                                         index,
+                                        0,
                                         
convertSchemaChangeEvent((SchemaChangeEvent) event))));
             }
         }
@@ -279,6 +285,7 @@ public class BucketAssignOperator extends 
AbstractStreamOperator<Event>
                             new StreamRecord<>(
                                     new BucketWrapperChangeEvent(
                                             index,
+                                            0,
                                             new CreateTableEvent(
                                                     tableId,
                                                     
mixedSchemaInfo.paimonSchemaInfo
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperChangeEvent.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperChangeEvent.java
index 6c0b3c155..95e3888fc 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperChangeEvent.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperChangeEvent.java
@@ -27,11 +27,13 @@ import java.util.Objects;
 public class BucketWrapperChangeEvent implements ChangeEvent, BucketWrapper, 
Serializable {
     private static final long serialVersionUID = 1L;
     private final int bucket;
+    private final int partition;
 
     private final ChangeEvent innerEvent;
 
-    public BucketWrapperChangeEvent(int bucket, ChangeEvent innerEvent) {
+    public BucketWrapperChangeEvent(int bucket, int partition, ChangeEvent 
innerEvent) {
         this.bucket = bucket;
+        this.partition = partition;
         this.innerEvent = innerEvent;
     }
 
@@ -39,6 +41,10 @@ public class BucketWrapperChangeEvent implements 
ChangeEvent, BucketWrapper, Ser
         return bucket;
     }
 
+    public int getPartition() {
+        return partition;
+    }
+
     public ChangeEvent getInnerEvent() {
         return innerEvent;
     }
@@ -57,12 +63,14 @@ public class BucketWrapperChangeEvent implements 
ChangeEvent, BucketWrapper, Ser
             return false;
         }
         BucketWrapperChangeEvent that = (BucketWrapperChangeEvent) o;
-        return bucket == that.bucket && Objects.equals(innerEvent, 
that.innerEvent);
+        return bucket == that.bucket
+                && partition == that.partition
+                && Objects.equals(innerEvent, that.innerEvent);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(bucket, innerEvent);
+        return Objects.hash(bucket, partition, innerEvent);
     }
 
     @Override
@@ -70,6 +78,8 @@ public class BucketWrapperChangeEvent implements ChangeEvent, 
BucketWrapper, Ser
         return "BucketWrapperChangeEvent{"
                 + "bucket="
                 + bucket
+                + ", partition="
+                + partition
                 + ", innerEvent="
                 + innerEvent
                 + '}';
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventSerializer.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventSerializer.java
index 267500988..31d4452f9 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventSerializer.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventSerializer.java
@@ -82,6 +82,7 @@ public class BucketWrapperEventSerializer extends 
TypeSerializerSingleton<Event>
             BucketWrapperChangeEvent bucketWrapperChangeEvent = 
(BucketWrapperChangeEvent) event;
             enumSerializer.serialize(EventClass.BUCKET_WRAPPER_CHANGE_EVENT, 
dataOutputView);
             dataOutputView.writeInt(bucketWrapperChangeEvent.getBucket());
+            dataOutputView.writeInt(bucketWrapperChangeEvent.getPartition());
             
eventSerializer.serialize(bucketWrapperChangeEvent.getInnerEvent(), 
dataOutputView);
         } else if (event instanceof BucketWrapperFlushEvent) {
             enumSerializer.serialize(EventClass.BUCKET_WRAPPER_FLUSH_EVENT, 
dataOutputView);
@@ -107,7 +108,9 @@ public class BucketWrapperEventSerializer extends 
TypeSerializerSingleton<Event>
                     schemaChangeEventTypeEnumSerializer.deserialize(source));
         } else {
             return new BucketWrapperChangeEvent(
-                    source.readInt(), (ChangeEvent) 
eventSerializer.deserialize(source));
+                    source.readInt(),
+                    source.readInt(),
+                    (ChangeEvent) eventSerializer.deserialize(source));
         }
     }
 

Reply via email to