This is an automated email from the ASF dual-hosted git repository.

vhs pushed a commit to branch release-1.0.2
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit a3dcd03579c7583ae90aad6b2cba89618149a0d5
Author: Danny Chan <[email protected]>
AuthorDate: Thu Apr 10 19:53:15 2025 +0800

    [HUDI-9286] Fix the atomicity of archived timeline write (#13128)
    
    (cherry picked from commit 934754464670f3ba4e6f1e9410edf37906c86dc5)
---
 .../timeline/versioning/v2/LSMTimelineWriter.java  | 33 ++++++++++++++++++++--
 1 file changed, 30 insertions(+), 3 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/LSMTimelineWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/LSMTimelineWriter.java
index ba393feb807..36de4abdfc6 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/LSMTimelineWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/LSMTimelineWriter.java
@@ -37,6 +37,7 @@ import 
org.apache.hudi.common.util.collection.ClosableIterator;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieCommitException;
 import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.io.hadoop.HoodieAvroParquetReader;
 import org.apache.hudi.io.storage.HoodieFileWriter;
@@ -116,10 +117,27 @@ public class LSMTimelineWriter {
   public void write(
       List<ActiveAction> activeActions,
       Option<Consumer<ActiveAction>> preWriteCallback,
-      Option<Consumer<Exception>> exceptionHandler) throws 
HoodieCommitException {
+      Option<Consumer<Exception>> exceptionHandler) {
     ValidationUtils.checkArgument(!activeActions.isEmpty(), "The instant 
actions to write should not be empty");
     StoragePath filePath = new StoragePath(this.archivePath,
         newFileName(activeActions.get(0).getInstantTime(), 
activeActions.get(activeActions.size() - 1).getInstantTime(), FILE_LAYER_ZERO));
+    try {
+      if (this.metaClient.getStorage().exists(filePath)) {
+        // there are 2 cases this could happen:
+        // 1. the file was created but did not flush/close correctly and left 
as corrupt;
+        // 2. the file was in complete state but the archiving fails during 
the deletion of active metadata files.
+        if (isFileCommitted(filePath.getName())) {
+          // case2: the last archiving succeeded for committing to the archive 
timeline, just returns early.
+          LOG.warn("Skip archiving for the redundant file: {}", filePath);
+          return;
+        } else {
+          // case1: delete the corrupt file and retry.
+          this.metaClient.getStorage().deleteFile(filePath);
+        }
+      }
+    } catch (IOException ioe) {
+      throw new HoodieIOException("Failed to check archiving file before 
write: " + filePath, ioe);
+    }
     try (HoodieFileWriter writer = openWriter(filePath)) {
       Schema wrapperSchema = HoodieLSMTimelineInstant.getClassSchema();
       LOG.info("Writing schema " + wrapperSchema.toString());
@@ -139,8 +157,8 @@ public class LSMTimelineWriter {
     }
     try {
       updateManifest(filePath.getName());
-    } catch (Exception e) {
-      throw new HoodieCommitException("Failed to update archiving manifest", 
e);
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to update archiving manifest", e);
     }
   }
 
@@ -332,6 +350,15 @@ public class LSMTimelineWriter {
     }
   }
 
+  /**
+   * Returns whether the archiving file is committed(visible to the timeline 
reader).
+   */
+  private boolean isFileCommitted(String fileName) throws IOException {
+    HoodieLSMTimelineManifest latestManifest = 
LSMTimeline.latestSnapshotManifest(metaClient, archivePath);
+    return latestManifest.getFiles()
+        .stream().anyMatch(fileEntry -> 
fileEntry.getFileName().equals(fileName));
+  }
+
   private HoodieLSMTimelineManifest.LSMFileEntry getFileEntry(String fileName) 
throws IOException {
     long fileLen = metaClient.getStorage().getPathInfo(
         new StoragePath(archivePath, fileName)).getLength();

Reply via email to