This is an automated email from the ASF dual-hosted git repository.

pwason pushed a commit to branch release-0.14.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 802d75b285bac354b2b106fd72f79498c1e389cb
Author: Jon Vexler <[email protected]>
AuthorDate: Wed Aug 23 22:30:41 2023 -0400

    [HUDI-6718] Check Timeline Before Transitioning Inflight Clean in 
Multiwriter Scenario (#9468)
    
    - If two cleans start at nearly the same time, they will both attempt to 
execute the same clean instances. This does not cause any data corruption, but 
will cause a writer to fail when they attempt to create the commit in the 
timeline. This is because the commit will have already been written by the 
first writer. Now, we check the timeline before transitioning state.
    
    Co-authored-by: Jonathan Vexler <=>
---
 .../hudi/table/action/clean/CleanActionExecutor.java       | 14 +++++++++++++-
 1 file changed, 13 insertions(+), 1 deletion(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
index 05e1056324a..c931e7bce9d 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanActionExecutor.java
@@ -261,8 +261,10 @@ public class CleanActionExecutor<T, I, K, O> extends 
BaseActionExecutor<T, I, K,
           LOG.info("Finishing previously unfinished cleaner instant=" + 
hoodieInstant);
           try {
             cleanMetadataList.add(runPendingClean(table, hoodieInstant));
+          } catch (HoodieIOException e) {
+            checkIfOtherWriterCommitted(hoodieInstant, e);
           } catch (Exception e) {
-            LOG.warn("Failed to perform previous clean operation, instant: " + 
hoodieInstant, e);
+            LOG.error("Failed to perform previous clean operation, instant: " 
+ hoodieInstant, e);
             throw e;
           }
         }
@@ -278,4 +280,14 @@ public class CleanActionExecutor<T, I, K, O> extends 
BaseActionExecutor<T, I, K,
     // This requires the CleanActionExecutor to be refactored as 
BaseCommitActionExecutor
     return cleanMetadataList.size() > 0 ? 
cleanMetadataList.get(cleanMetadataList.size() - 1) : null;
   }
+
+  private void checkIfOtherWriterCommitted(HoodieInstant hoodieInstant, 
HoodieIOException e) {
+    table.getMetaClient().reloadActiveTimeline();
+    if 
(table.getCleanTimeline().filterCompletedInstants().containsInstant(hoodieInstant.getTimestamp()))
 {
+      LOG.warn("Clean operation was completed by another writer for instant: " 
+ hoodieInstant);
+    } else {
+      LOG.error("Failed to perform previous clean operation, instant: " + 
hoodieInstant, e);
+      throw e;
+    }
+  }
 }

Reply via email to