This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new a1cfab9ef [FLINK-38833][paimon] Shuffle record to different subtasks
by table, partition and bucket. (#4298)
a1cfab9ef is described below
commit a1cfab9ef6e23017f06e29d07be9aa217f71ea1c
Author: Pei Yu <[email protected]>
AuthorDate: Wed Mar 4 17:32:56 2026 +0800
[FLINK-38833][paimon] Shuffle record to different subtasks by table,
partition and bucket. (#4298)
Signed-off-by: Pei Yu <[email protected]>
---
.../cdc/connectors/paimon/sink/v2/PaimonEventSink.java | 1 +
.../paimon/sink/v2/bucket/BucketAssignOperator.java | 9 ++++++++-
.../paimon/sink/v2/bucket/BucketWrapperChangeEvent.java | 16 +++++++++++++---
.../sink/v2/bucket/BucketWrapperEventSerializer.java | 5 ++++-
4 files changed, 26 insertions(+), 5 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEventSink.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEventSink.java
index eb2af9f43..9487a3cc2 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEventSink.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonEventSink.java
@@ -69,6 +69,7 @@ public class PaimonEventSink extends PaimonSink<Event>
implements WithPreWriteTo
if (event instanceof BucketWrapperChangeEvent) {
// Add hash of tableId to avoid data skew.
return ((BucketWrapperChangeEvent)
event).getBucket()
+ + ((BucketWrapperChangeEvent)
event).getPartition()
+ ((BucketWrapperChangeEvent)
event).tableId().hashCode();
} else {
return ((BucketWrapper) event).getBucket();
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java
index 1e2dd65a1..faa832fc7 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketAssignOperator.java
@@ -157,6 +157,7 @@ public class BucketAssignOperator extends
AbstractStreamOperator<Event>
bucketAssignerMap.computeIfAbsent(
dataChangeEvent.tableId(), this::getTableInfo);
int bucket;
+ int partition;
GenericRow genericRow =
PaimonWriterHelper.convertEventToGenericRow(
dataChangeEvent,
@@ -171,17 +172,20 @@ public class BucketAssignOperator extends
AbstractStreamOperator<Event>
tuple4.f2.assign(
tuple4.f3.partition(genericRow),
tuple4.f3.trimmedPrimaryKey(genericRow).hashCode());
+ partition = tuple4.f3.partition(genericRow).hashCode();
break;
}
case HASH_FIXED:
{
tuple4.f1.setRecord(genericRow);
bucket = tuple4.f1.bucket();
+ partition = tuple4.f1.partition().hashCode();
break;
}
case BUCKET_UNAWARE:
{
bucket = 0;
+ partition = 0;
break;
}
case KEY_DYNAMIC:
@@ -191,7 +195,8 @@ public class BucketAssignOperator extends
AbstractStreamOperator<Event>
}
}
output.collect(
- new StreamRecord<>(new BucketWrapperChangeEvent(bucket,
dataChangeEvent)));
+ new StreamRecord<>(
+ new BucketWrapperChangeEvent(bucket, partition,
dataChangeEvent)));
} else {
// Broadcast SchemachangeEvent.
for (int index = 0; index < totalTasksNumber; index++) {
@@ -199,6 +204,7 @@ public class BucketAssignOperator extends
AbstractStreamOperator<Event>
new StreamRecord<>(
new BucketWrapperChangeEvent(
index,
+ 0,
convertSchemaChangeEvent((SchemaChangeEvent) event))));
}
}
@@ -279,6 +285,7 @@ public class BucketAssignOperator extends
AbstractStreamOperator<Event>
new StreamRecord<>(
new BucketWrapperChangeEvent(
index,
+ 0,
new CreateTableEvent(
tableId,
mixedSchemaInfo.paimonSchemaInfo
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperChangeEvent.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperChangeEvent.java
index 6c0b3c155..95e3888fc 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperChangeEvent.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperChangeEvent.java
@@ -27,11 +27,13 @@ import java.util.Objects;
public class BucketWrapperChangeEvent implements ChangeEvent, BucketWrapper,
Serializable {
private static final long serialVersionUID = 1L;
private final int bucket;
+ private final int partition;
private final ChangeEvent innerEvent;
- public BucketWrapperChangeEvent(int bucket, ChangeEvent innerEvent) {
+ public BucketWrapperChangeEvent(int bucket, int partition, ChangeEvent
innerEvent) {
this.bucket = bucket;
+ this.partition = partition;
this.innerEvent = innerEvent;
}
@@ -39,6 +41,10 @@ public class BucketWrapperChangeEvent implements
ChangeEvent, BucketWrapper, Ser
return bucket;
}
+ public int getPartition() {
+ return partition;
+ }
+
public ChangeEvent getInnerEvent() {
return innerEvent;
}
@@ -57,12 +63,14 @@ public class BucketWrapperChangeEvent implements
ChangeEvent, BucketWrapper, Ser
return false;
}
BucketWrapperChangeEvent that = (BucketWrapperChangeEvent) o;
- return bucket == that.bucket && Objects.equals(innerEvent,
that.innerEvent);
+ return bucket == that.bucket
+ && partition == that.partition
+ && Objects.equals(innerEvent, that.innerEvent);
}
@Override
public int hashCode() {
- return Objects.hash(bucket, innerEvent);
+ return Objects.hash(bucket, partition, innerEvent);
}
@Override
@@ -70,6 +78,8 @@ public class BucketWrapperChangeEvent implements ChangeEvent,
BucketWrapper, Ser
return "BucketWrapperChangeEvent{"
+ "bucket="
+ bucket
+ + ", partition="
+ + partition
+ ", innerEvent="
+ innerEvent
+ '}';
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventSerializer.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventSerializer.java
index 267500988..31d4452f9 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventSerializer.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/bucket/BucketWrapperEventSerializer.java
@@ -82,6 +82,7 @@ public class BucketWrapperEventSerializer extends
TypeSerializerSingleton<Event>
BucketWrapperChangeEvent bucketWrapperChangeEvent =
(BucketWrapperChangeEvent) event;
enumSerializer.serialize(EventClass.BUCKET_WRAPPER_CHANGE_EVENT,
dataOutputView);
dataOutputView.writeInt(bucketWrapperChangeEvent.getBucket());
+ dataOutputView.writeInt(bucketWrapperChangeEvent.getPartition());
eventSerializer.serialize(bucketWrapperChangeEvent.getInnerEvent(),
dataOutputView);
} else if (event instanceof BucketWrapperFlushEvent) {
enumSerializer.serialize(EventClass.BUCKET_WRAPPER_FLUSH_EVENT,
dataOutputView);
@@ -107,7 +108,9 @@ public class BucketWrapperEventSerializer extends
TypeSerializerSingleton<Event>
schemaChangeEventTypeEnumSerializer.deserialize(source));
} else {
return new BucketWrapperChangeEvent(
- source.readInt(), (ChangeEvent)
eventSerializer.deserialize(source));
+ source.readInt(),
+ source.readInt(),
+ (ChangeEvent) eventSerializer.deserialize(source));
}
}