This is an automated email from the ASF dual-hosted git repository.

xushiyan pushed a commit to branch release-0.11.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit fcaeaea62f42e0bd1e49970ebcc1830519c80f2a
Author: 吴祥平 <[email protected]>
AuthorDate: Wed Apr 20 19:23:39 2022 +0800

    [HUDI-3912] Fix lose data when rollback in flink async compact (#5357)
    
    * stop add event when has failed compact event
    
    Co-authored-by: wxp <[email protected]>
---
 .../apache/hudi/sink/compact/CompactionCommitSink.java  | 17 ++++++++++++-----
 1 file changed, 12 insertions(+), 5 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
index ecd66936e8..c9fb7aceb2 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/compact/CompactionCommitSink.java
@@ -101,11 +101,6 @@ public class CompactionCommitSink extends 
CleanFunction<CompactionCommitEvent> {
   @Override
   public void invoke(CompactionCommitEvent event, Context context) throws 
Exception {
     final String instant = event.getInstant();
-    if (event.isFailed()) {
-      // handle failure case
-      CompactionUtil.rollbackCompaction(table, event.getInstant());
-      return;
-    }
     commitBuffer.computeIfAbsent(instant, k -> new HashMap<>())
         .put(event.getFileId(), event);
     commitIfNecessary(instant, commitBuffer.get(instant).values());
@@ -132,6 +127,18 @@ public class CompactionCommitSink extends 
CleanFunction<CompactionCommitEvent> {
     if (!isReady) {
       return;
     }
+
+    if (events.stream().anyMatch(CompactionCommitEvent::isFailed)) {
+      try {
+        // handle failure case
+        CompactionUtil.rollbackCompaction(table, instant);
+      } finally {
+        // remove commitBuffer to avoid obsolete metadata commit
+        reset(instant);
+        return;
+      }
+    }
+
     try {
       doCommit(instant, events);
     } catch (Throwable throwable) {

Reply via email to