This is an automated email from the ASF dual-hosted git repository. vhs pushed a commit to branch release-1.0.2 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit a3dcd03579c7583ae90aad6b2cba89618149a0d5 Author: Danny Chan <[email protected]> AuthorDate: Thu Apr 10 19:53:15 2025 +0800 [HUDI-9286] Fix the atomicity of archived timeline write (#13128) (cherry picked from commit 934754464670f3ba4e6f1e9410edf37906c86dc5) --- .../timeline/versioning/v2/LSMTimelineWriter.java | 33 ++++++++++++++++++++-- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/LSMTimelineWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/LSMTimelineWriter.java index ba393feb807..36de4abdfc6 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/LSMTimelineWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/LSMTimelineWriter.java @@ -37,6 +37,7 @@ import org.apache.hudi.common.util.collection.ClosableIterator; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCommitException; import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.io.hadoop.HoodieAvroParquetReader; import org.apache.hudi.io.storage.HoodieFileWriter; @@ -116,10 +117,27 @@ public class LSMTimelineWriter { public void write( List<ActiveAction> activeActions, Option<Consumer<ActiveAction>> preWriteCallback, - Option<Consumer<Exception>> exceptionHandler) throws HoodieCommitException { + Option<Consumer<Exception>> exceptionHandler) { ValidationUtils.checkArgument(!activeActions.isEmpty(), "The instant actions to write should not be empty"); StoragePath filePath = new StoragePath(this.archivePath, newFileName(activeActions.get(0).getInstantTime(), activeActions.get(activeActions.size() - 1).getInstantTime(), FILE_LAYER_ZERO)); + try { + if (this.metaClient.getStorage().exists(filePath)) { + // there are 2 cases this could happen: + // 1. the file was created but did not flush/close correctly and left as corrupt; + // 2. the file was in complete state but the archiving fails during the deletion of active metadata files. + if (isFileCommitted(filePath.getName())) { + // case2: the last archiving succeeded for committing to the archive timeline, just returns early. + LOG.warn("Skip archiving for the redundant file: {}", filePath); + return; + } else { + // case1: delete the corrupt file and retry. + this.metaClient.getStorage().deleteFile(filePath); + } + } + } catch (IOException ioe) { + throw new HoodieIOException("Failed to check archiving file before write: " + filePath, ioe); + } try (HoodieFileWriter writer = openWriter(filePath)) { Schema wrapperSchema = HoodieLSMTimelineInstant.getClassSchema(); LOG.info("Writing schema " + wrapperSchema.toString()); @@ -139,8 +157,8 @@ public class LSMTimelineWriter { } try { updateManifest(filePath.getName()); - } catch (Exception e) { - throw new HoodieCommitException("Failed to update archiving manifest", e); + } catch (IOException e) { + throw new HoodieIOException("Failed to update archiving manifest", e); } } @@ -332,6 +350,15 @@ public class LSMTimelineWriter { } } + /** + * Returns whether the archiving file is committed(visible to the timeline reader). + */ + private boolean isFileCommitted(String fileName) throws IOException { + HoodieLSMTimelineManifest latestManifest = LSMTimeline.latestSnapshotManifest(metaClient, archivePath); + return latestManifest.getFiles() + .stream().anyMatch(fileEntry -> fileEntry.getFileName().equals(fileName)); + } + private HoodieLSMTimelineManifest.LSMFileEntry getFileEntry(String fileName) throws IOException { long fileLen = metaClient.getStorage().getPathInfo( new StoragePath(archivePath, fileName)).getLength();
