This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new cfaf3cbe1 [FLINK-38491][pipeline-connector][iceberg] fixed iceberg
compaction parallelism (#4150)
cfaf3cbe1 is described below
commit cfaf3cbe10f94621ac08934848972e2bcd3d90b3
Author: fcfangcc <[email protected]>
AuthorDate: Mon Nov 17 20:41:14 2025 +0800
[FLINK-38491][pipeline-connector][iceberg] fixed iceberg compaction
parallelism (#4150)
---
.../flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java | 1 +
.../org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java | 2 +-
.../flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java | 1 +
.../flink/cdc/connectors/iceberg/sink/v2/CompactionOperatorTest.java | 3 ++-
4 files changed, 5 insertions(+), 2 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java
index 722f5f7bc..d13f8f070 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java
@@ -122,6 +122,7 @@ public class IcebergDataSinkFactory implements
DataSinkFactory {
options.add(IcebergDataSinkOptions.PARTITION_KEY);
options.add(IcebergDataSinkOptions.SINK_COMPACTION_ENABLED);
options.add(IcebergDataSinkOptions.SINK_COMPACTION_COMMIT_INTERVAL);
+ options.add(IcebergDataSinkOptions.SINK_COMPACTION_PARALLELISM);
return options;
}
}
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java
index 02d693aba..2d3269f55 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java
@@ -136,7 +136,7 @@ public class IcebergSink
// Shuffle by different table id.
DataStream<CommittableMessage<WriteResultWrapper>> keyedStream =
committableMessageDataStream.partitionCustom(
- (bucket, numPartitions) -> bucket % numPartitions,
+ Math::floorMod,
(committableMessage) -> {
if (committableMessage instanceof
CommittableWithLineage) {
WriteResultWrapper multiTableCommittable =
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java
index 8a5e5e4d0..126491b73 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java
@@ -40,6 +40,7 @@ public class IcebergDataSinkFactoryTest {
Configuration conf = Configuration.fromMap(ImmutableMap.<String,
String>builder().build());
conf.set(IcebergDataSinkOptions.WAREHOUSE, "/tmp/warehouse");
+ conf.set(IcebergDataSinkOptions.SINK_COMPACTION_PARALLELISM, 4);
DataSink dataSink =
sinkFactory.createDataSink(
new FactoryHelper.DefaultContext(
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/CompactionOperatorTest.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/CompactionOperatorTest.java
index a150e7188..f73a8c921 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/CompactionOperatorTest.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/CompactionOperatorTest.java
@@ -150,7 +150,8 @@ public class CompactionOperatorTest {
}
CompactionOperator compactionOperator =
new CompactionOperator(
- catalogOptions,
CompactionOptions.builder().commitInterval(1).build());
+ catalogOptions,
+
CompactionOptions.builder().commitInterval(1).parallelism(4).build());
compactionOperator.processElement(
new StreamRecord<>(
new CommittableWithLineage<>(