This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push: new 344779856 [FLINK-37577][pipeline-connector/paimon] Enable waitCompaction feature in PaimonWriter only when deletion-vectors.enabled option enabled 344779856 is described below commit 344779856a9c37e2fbdb018c5cb1d85597a3d240 Author: Kunni <lvyanquan....@alibaba-inc.com> AuthorDate: Thu Apr 3 15:21:48 2025 +0800 [FLINK-37577][pipeline-connector/paimon] Enable waitCompaction feature in PaimonWriter only when deletion-vectors.enabled option enabled This closes #3971. --- .../flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java index 72cc8a6a5..bf660454e 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonWriter.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.streaming.api.operators.StreamOperator; +import org.apache.paimon.CoreOptions; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.GenericRow; @@ -107,7 +108,7 @@ public class PaimonWriter<InputT> // avoid prepareCommit the same checkpointId with the first // round. return entry.getValue() - .prepareCommit(true, lastCheckpointId + 1).stream() + .prepareCommit(false, lastCheckpointId + 1).stream() .map( committable -> MultiTableCommittable @@ -155,13 +156,22 @@ public class PaimonWriter<InputT> writes.computeIfAbsent( tableId, id -> { + boolean waitCompaction = + Boolean.parseBoolean( + table.options() + .getOrDefault( + CoreOptions.DELETION_VECTORS_ENABLED + .key(), + CoreOptions.DELETION_VECTORS_ENABLED + .defaultValue() + .toString())); StoreSinkWriteImpl storeSinkWrite = new StoreSinkWriteImpl( table, commitUser, ioManager, false, - false, + waitCompaction, true, memoryPoolFactory, metricGroup);