This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 344779856 [FLINK-37577][pipeline-connector/paimon] Enable 
waitCompaction feature in PaimonWriter only when deletion-vectors.enabled 
option enabled
344779856 is described below

commit 344779856a9c37e2fbdb018c5cb1d85597a3d240
Author: Kunni <lvyanquan....@alibaba-inc.com>
AuthorDate: Thu Apr 3 15:21:48 2025 +0800

    [FLINK-37577][pipeline-connector/paimon] Enable waitCompaction feature in 
PaimonWriter only when deletion-vectors.enabled option enabled
    
    This closes  #3971.
---
 .../flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java  | 14 ++++++++++++--
 1 file changed, 12 insertions(+), 2 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java
index 72cc8a6a5..bf660454e 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 
+import org.apache.paimon.CoreOptions;
 import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.data.GenericRow;
@@ -107,7 +108,7 @@ public class PaimonWriter<InputT>
                                         // avoid prepareCommit the same 
checkpointId with the first
                                         // round.
                                         return entry.getValue()
-                                                .prepareCommit(true, 
lastCheckpointId + 1).stream()
+                                                .prepareCommit(false, 
lastCheckpointId + 1).stream()
                                                 .map(
                                                         committable ->
                                                                 
MultiTableCommittable
@@ -155,13 +156,22 @@ public class PaimonWriter<InputT>
                     writes.computeIfAbsent(
                             tableId,
                             id -> {
+                                boolean waitCompaction =
+                                        Boolean.parseBoolean(
+                                                table.options()
+                                                        .getOrDefault(
+                                                                
CoreOptions.DELETION_VECTORS_ENABLED
+                                                                        .key(),
+                                                                
CoreOptions.DELETION_VECTORS_ENABLED
+                                                                        
.defaultValue()
+                                                                        
.toString()));
                                 StoreSinkWriteImpl storeSinkWrite =
                                         new StoreSinkWriteImpl(
                                                 table,
                                                 commitUser,
                                                 ioManager,
                                                 false,
-                                                false,
+                                                waitCompaction,
                                                 true,
                                                 memoryPoolFactory,
                                                 metricGroup);

Reply via email to