This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new cfaf3cbe1 [FLINK-38491][pipeline-connector][iceberg] fixed iceberg 
compaction parallelism (#4150)
cfaf3cbe1 is described below

commit cfaf3cbe10f94621ac08934848972e2bcd3d90b3
Author: fcfangcc <[email protected]>
AuthorDate: Mon Nov 17 20:41:14 2025 +0800

    [FLINK-38491][pipeline-connector][iceberg] fixed iceberg compaction 
parallelism (#4150)
---
 .../flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java      | 1 +
 .../org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java   | 2 +-
 .../flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java  | 1 +
 .../flink/cdc/connectors/iceberg/sink/v2/CompactionOperatorTest.java   | 3 ++-
 4 files changed, 5 insertions(+), 2 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java
index 722f5f7bc..d13f8f070 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactory.java
@@ -122,6 +122,7 @@ public class IcebergDataSinkFactory implements 
DataSinkFactory {
         options.add(IcebergDataSinkOptions.PARTITION_KEY);
         options.add(IcebergDataSinkOptions.SINK_COMPACTION_ENABLED);
         options.add(IcebergDataSinkOptions.SINK_COMPACTION_COMMIT_INTERVAL);
+        options.add(IcebergDataSinkOptions.SINK_COMPACTION_PARALLELISM);
         return options;
     }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java
index 02d693aba..2d3269f55 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/IcebergSink.java
@@ -136,7 +136,7 @@ public class IcebergSink
             // Shuffle by different table id.
             DataStream<CommittableMessage<WriteResultWrapper>> keyedStream =
                     committableMessageDataStream.partitionCustom(
-                            (bucket, numPartitions) -> bucket % numPartitions,
+                            Math::floorMod,
                             (committableMessage) -> {
                                 if (committableMessage instanceof 
CommittableWithLineage) {
                                     WriteResultWrapper multiTableCommittable =
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java
index 8a5e5e4d0..126491b73 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergDataSinkFactoryTest.java
@@ -40,6 +40,7 @@ public class IcebergDataSinkFactoryTest {
 
         Configuration conf = Configuration.fromMap(ImmutableMap.<String, 
String>builder().build());
         conf.set(IcebergDataSinkOptions.WAREHOUSE, "/tmp/warehouse");
+        conf.set(IcebergDataSinkOptions.SINK_COMPACTION_PARALLELISM, 4);
         DataSink dataSink =
                 sinkFactory.createDataSink(
                         new FactoryHelper.DefaultContext(
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/CompactionOperatorTest.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/CompactionOperatorTest.java
index a150e7188..f73a8c921 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/CompactionOperatorTest.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/test/java/org/apache/flink/cdc/connectors/iceberg/sink/v2/CompactionOperatorTest.java
@@ -150,7 +150,8 @@ public class CompactionOperatorTest {
         }
         CompactionOperator compactionOperator =
                 new CompactionOperator(
-                        catalogOptions, 
CompactionOptions.builder().commitInterval(1).build());
+                        catalogOptions,
+                        
CompactionOptions.builder().commitInterval(1).parallelism(4).build());
         compactionOperator.processElement(
                 new StreamRecord<>(
                         new CommittableWithLineage<>(

Reply via email to