This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 3bb1f3ca4 [FLINK-35938][pipeline-connector][paimon] Use 
filterAndCommit API for retry Committable to avoid duplicate commit
3bb1f3ca4 is described below

commit 3bb1f3ca45e58e4489cc7a03ef928de40895d138
Author: Kunni <[email protected]>
AuthorDate: Mon Aug 12 10:27:19 2024 +0800

    [FLINK-35938][pipeline-connector][paimon] Use filterAndCommit API for retry 
Committable to avoid duplicate commit
    
    This closes #3504.
---
 .../connectors/paimon/sink/v2/PaimonCommitter.java | 31 +++++++++++++++++++---
 1 file changed, 27 insertions(+), 4 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java
index d0db819e1..f89809468 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java
@@ -24,8 +24,9 @@ import org.apache.paimon.flink.sink.MultiTableCommittable;
 import org.apache.paimon.flink.sink.StoreMultiCommitter;
 import org.apache.paimon.manifest.WrappedManifestCommittable;
 import org.apache.paimon.options.Options;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -34,6 +35,8 @@ import java.util.stream.Collectors;
 /** A {@link Committer} to commit write results for multiple tables. */
 public class PaimonCommitter implements Committer<MultiTableCommittable> {
 
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(PaimonCommitter.class);
+
     private final StoreMultiCommitter storeMultiCommitter;
 
     public PaimonCommitter(Options catalogOptions, String commitUser) {
@@ -46,19 +49,39 @@ public class PaimonCommitter implements 
Committer<MultiTableCommittable> {
     }
 
     @Override
-    public void commit(Collection<CommitRequest<MultiTableCommittable>> 
commitRequests)
-            throws IOException, InterruptedException {
+    public void commit(Collection<CommitRequest<MultiTableCommittable>> 
commitRequests) {
         if (commitRequests.isEmpty()) {
             return;
         }
+
         List<MultiTableCommittable> committables =
                 commitRequests.stream()
                         .map(CommitRequest::getCommittable)
                         .collect(Collectors.toList());
+        // All CommitRequest shared the same checkpointId.
         long checkpointId = committables.get(0).checkpointId();
+        int retriedNumber = 
commitRequests.stream().findFirst().get().getNumberOfRetries();
         WrappedManifestCommittable wrappedManifestCommittable =
                 storeMultiCommitter.combine(checkpointId, 1L, committables);
-        
storeMultiCommitter.commit(Collections.singletonList(wrappedManifestCommittable));
+        try {
+            if (retriedNumber > 0) {
+                storeMultiCommitter.filterAndCommit(
+                        Collections.singletonList(wrappedManifestCommittable));
+            } else {
+                
storeMultiCommitter.commit(Collections.singletonList(wrappedManifestCommittable));
+            }
+            commitRequests.forEach(CommitRequest::signalAlreadyCommitted);
+            LOGGER.info(
+                    String.format(
+                            "Commit succeeded for %s with %s committable",
+                            checkpointId, committables.size()));
+        } catch (Exception e) {
+            commitRequests.forEach(CommitRequest::retryLater);
+            LOGGER.warn(
+                    String.format(
+                            "Commit failed for %s with %s committable",
+                            checkpointId, committables.size()));
+        }
     }
 
     @Override

Reply via email to