This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit deb90df6409723c052aa11fc2208934fc2e10519
Author: Wechar Yu <[email protected]>
AuthorDate: Sat Apr 20 08:18:18 2024 +0800

    [HUDI-7515] Fix partition metadata write failure (#10886)
---
 .../apache/hudi/cli/commands/RepairsCommand.java   |  4 +-
 .../org/apache/hudi/io/HoodieAppendHandle.java     |  2 +-
 .../org/apache/hudi/io/HoodieCreateHandle.java     |  2 +-
 .../java/org/apache/hudi/io/HoodieMergeHandle.java |  2 +-
 .../io/storage/row/HoodieRowDataCreateHandle.java  |  2 +-
 .../hudi/io/storage/row/HoodieRowCreateHandle.java |  2 +-
 .../hudi/common/model/HoodiePartitionMetadata.java | 80 ++++++++++++----------
 .../table/timeline/HoodieActiveTimeline.java       |  6 +-
 .../common/model/TestHoodiePartitionMetadata.java  |  2 +-
 .../common/testutils/HoodieTestDataGenerator.java  |  5 +-
 .../hudi/common/util/TestTablePathUtils.java       |  4 +-
 .../hudi/hadoop/testutils/InputFormatTestUtil.java |  2 +-
 .../org/apache/hudi/storage/HoodieStorage.java     |  2 +-
 .../RepairAddpartitionmetaProcedure.scala          |  2 +-
 .../RepairMigratePartitionMetaProcedure.scala      |  2 +-
 15 files changed, 60 insertions(+), 59 deletions(-)

diff --git 
a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
index a41e57a0bb2..28e1a0d39ba 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java
@@ -139,7 +139,7 @@ public class RepairsCommand {
           HoodiePartitionMetadata partitionMetadata =
               new HoodiePartitionMetadata(HoodieCLI.storage, latestCommit, 
basePath, partitionPath,
                   client.getTableConfig().getPartitionMetafileFormat());
-          partitionMetadata.trySave(0);
+          partitionMetadata.trySave();
           row[2] = "Repaired";
         }
       }
@@ -256,7 +256,7 @@ public class RepairsCommand {
           HoodiePartitionMetadata partitionMetadata =
               new HoodiePartitionMetadata(HoodieCLI.storage, latestCommit, 
basePath, partition,
                   Option.of(client.getTableConfig().getBaseFileFormat()));
-          partitionMetadata.trySave(0);
+          partitionMetadata.trySave();
         }
 
         // delete it, in case we failed midway last time.
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
index 40613e15b1f..2bac318fc81 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java
@@ -208,7 +208,7 @@ public class HoodieAppendHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
         HoodiePartitionMetadata partitionMetadata = new 
HoodiePartitionMetadata(storage, baseInstantTime,
             new StoragePath(config.getBasePath()), 
FSUtils.getPartitionPath(config.getBasePath(), partitionPath),
             hoodieTable.getPartitionMetafileFormat());
-        partitionMetadata.trySave(getPartitionId());
+        partitionMetadata.trySave();
         this.writer = createLogWriter(fileSlice, baseInstantTime);
       } catch (Exception e) {
         LOG.error("Error in update task at commit " + instantTime, e);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
index 6f3824ac34c..0ad4e212a1a 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java
@@ -100,7 +100,7 @@ public class HoodieCreateHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O
           new StoragePath(config.getBasePath()),
           FSUtils.getPartitionPath(config.getBasePath(), partitionPath),
           hoodieTable.getPartitionMetafileFormat());
-      partitionMetadata.trySave(getPartitionId());
+      partitionMetadata.trySave();
       createMarkerFile(partitionPath,
           FSUtils.makeBaseFileName(this.instantTime, this.writeToken, 
this.fileId, hoodieTable.getBaseFileExtension()));
       this.fileWriter =
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
index 55aa334a97a..afae82fd13f 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java
@@ -176,7 +176,7 @@ public class HoodieMergeHandle<T, I, K, O> extends 
HoodieWriteHandle<T, I, K, O>
           new StoragePath(config.getBasePath()),
           FSUtils.getPartitionPath(config.getBasePath(), partitionPath),
           hoodieTable.getPartitionMetafileFormat());
-      partitionMetadata.trySave(getPartitionId());
+      partitionMetadata.trySave();
 
       String newFileName = FSUtils.makeBaseFileName(instantTime, writeToken, 
fileId, hoodieTable.getBaseFileExtension());
       makeOldAndNewFilePaths(partitionPath, latestValidFilePath, newFileName);
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
index 56e38dc8ddf..19455773153 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java
@@ -106,7 +106,7 @@ public class HoodieRowDataCreateHandle implements 
Serializable {
               new StoragePath(writeConfig.getBasePath()),
               FSUtils.getPartitionPath(writeConfig.getBasePath(), 
partitionPath),
               table.getPartitionMetafileFormat());
-      partitionMetadata.trySave(taskPartitionId);
+      partitionMetadata.trySave();
       createMarkerFile(partitionPath, 
FSUtils.makeBaseFileName(this.instantTime, getWriteToken(), this.fileId, 
table.getBaseFileExtension()));
       this.fileWriter = createNewFileWriter(path, table, writeConfig, rowType);
     } catch (IOException e) {
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
index 98341bf62b4..890b12899f1 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java
@@ -143,7 +143,7 @@ public class HoodieRowCreateHandle implements Serializable {
               new StoragePath(writeConfig.getBasePath()),
               FSUtils.getPartitionPath(writeConfig.getBasePath(), 
partitionPath),
               table.getPartitionMetafileFormat());
-      partitionMetadata.trySave(taskPartitionId);
+      partitionMetadata.trySave();
 
       createMarkerFile(partitionPath, fileName, instantTime, table, 
writeConfig);
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
index adeaaa5be4f..a90d05aefdd 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java
@@ -20,6 +20,7 @@ package org.apache.hudi.common.model;
 
 import org.apache.hudi.common.util.BaseFileUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.RetryHelper;
 import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
@@ -30,12 +31,13 @@ import org.apache.hadoop.conf.Configuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.UUID;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -94,36 +96,29 @@ public class HoodiePartitionMetadata {
   /**
    * Write the metadata safely into partition atomically.
    */
-  public void trySave(int taskPartitionId) {
-    String extension = getMetafileExtension();
-    StoragePath tmpMetaPath =
-        new StoragePath(partitionPath, 
HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX + "_" + 
taskPartitionId + extension);
-    StoragePath metaPath = new StoragePath(partitionPath, 
HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX + extension);
-    boolean metafileExists = false;
+  public void trySave() throws HoodieIOException {
+    StoragePath metaPath = new StoragePath(
+        partitionPath, HOODIE_PARTITION_METAFILE_PREFIX + 
getMetafileExtension());
 
-    try {
-      metafileExists = storage.exists(metaPath);
-      if (!metafileExists) {
-        // write to temporary file
-        writeMetafile(tmpMetaPath);
-        // move to actual path
-        storage.rename(tmpMetaPath, metaPath);
-      }
-    } catch (IOException ioe) {
-      LOG.warn("Error trying to save partition metadata (this is okay, as long 
as at least 1 of these succeeded), "
-          + partitionPath, ioe);
-    } finally {
-      if (!metafileExists) {
-        try {
-          // clean up tmp file, if still lying around
-          if (storage.exists(tmpMetaPath)) {
-            storage.deleteFile(tmpMetaPath);
+    // This retry mechanism enables an exit-fast in metaPath exists check, 
which avoid the
+    // tasks failures when there are two or more tasks trying to create the 
same metaPath.
+    RetryHelper<Void, HoodieIOException>  retryHelper = new RetryHelper(1000, 
3, 1000, HoodieIOException.class.getName())
+        .tryWith(() -> {
+          if (!storage.exists(metaPath)) {
+            if (format.isPresent()) {
+              writeMetafileInFormat(metaPath, format.get());
+            } else {
+              // Backwards compatible properties file format
+              try (ByteArrayOutputStream os = new ByteArrayOutputStream()) {
+                props.store(os, "partition metadata");
+                Option content = Option.of(os.toByteArray());
+                storage.createImmutableFileInPath(metaPath, content);
+              }
+            }
           }
-        } catch (IOException ioe) {
-          LOG.warn("Error trying to clean up temporary files for " + 
partitionPath, ioe);
-        }
-      }
-    }
+          return null;
+        });
+    retryHelper.start();
   }
 
   private String getMetafileExtension() {
@@ -134,17 +129,26 @@ public class HoodiePartitionMetadata {
   /**
    * Write the partition metadata in the correct format in the given file path.
    *
-   * @param filePath path of the file to write.
+   * @param filePath Path of the file to write
+   * @param format Hoodie table file format
    * @throws IOException
    */
-  private void writeMetafile(StoragePath filePath) throws IOException {
-    if (format.isPresent()) {
-      BaseFileUtils.getInstance(format.get()).writeMetaFile(storage, filePath, 
props);
-    } else {
-      // Backwards compatible properties file format
-      try (OutputStream os = storage.create(filePath, true)) {
-        props.store(os, "partition metadata");
-        os.flush();
+  private void writeMetafileInFormat(StoragePath filePath, HoodieFileFormat 
format) throws IOException {
+    StoragePath tmpPath = new StoragePath(partitionPath,
+        HOODIE_PARTITION_METAFILE_PREFIX + "_" + UUID.randomUUID() + 
getMetafileExtension());
+    try {
+      // write to temporary file
+      BaseFileUtils.getInstance(format).writeMetaFile(storage, tmpPath, props);
+      // move to actual path
+      storage.rename(tmpPath, filePath);
+    } finally {
+      try {
+        // clean up tmp file, if still lying around
+        if (storage.exists(tmpPath)) {
+          storage.deleteFile(tmpPath);
+        }
+      } catch (IOException ioe) {
+        LOG.warn("Error trying to clean up temporary files for " + 
partitionPath, ioe);
       }
     }
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
index 0545fe392fc..3c8d6aa4306 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java
@@ -796,11 +796,7 @@ public class HoodieActiveTimeline extends 
HoodieDefaultTimeline {
     if (allowOverwrite || 
metaClient.getTimelineLayoutVersion().isNullVersion()) {
       FileIOUtils.createFileInPath(metaClient.getStorage(), fullPath, content);
     } else {
-      try {
-        metaClient.getStorage().createImmutableFileInPath(fullPath, content);
-      } catch (IOException e) {
-        throw new HoodieIOException("Cannot create immutable file: " + 
fullPath, e);
-      }
+      metaClient.getStorage().createImmutableFileInPath(fullPath, content);
     }
   }
 
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodiePartitionMetadata.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodiePartitionMetadata.java
index 70474ec833f..ef01aa7deed 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodiePartitionMetadata.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodiePartitionMetadata.java
@@ -77,7 +77,7 @@ public class TestHoodiePartitionMetadata extends 
HoodieCommonTestHarness {
     HoodiePartitionMetadata writtenMetadata = new HoodiePartitionMetadata(
         metaClient.getStorage(), commitTime, new StoragePath(basePath), 
partitionPath,
         format);
-    writtenMetadata.trySave(0);
+    writtenMetadata.trySave();
 
     // when
     HoodiePartitionMetadata readMetadata = new HoodiePartitionMetadata(
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
index 9cb2ab3bfb7..a7440f8993a 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java
@@ -253,7 +253,8 @@ public class HoodieTestDataGenerator implements 
AutoCloseable {
    * @deprecated please use non-static version
    */
   public static void writePartitionMetadataDeprecated(HoodieStorage storage,
-                                                      String[] partitionPaths, 
String basePath) {
+                                                      String[] partitionPaths,
+                                                      String basePath) {
     new HoodieTestDataGenerator().writePartitionMetadata(storage, 
partitionPaths, basePath);
   }
 
@@ -268,7 +269,7 @@ public class HoodieTestDataGenerator implements 
AutoCloseable {
                                      String basePath) {
     for (String partitionPath : partitionPaths) {
       new HoodiePartitionMetadata(storage, "000", new StoragePath(basePath),
-          new StoragePath(basePath, partitionPath), Option.empty()).trySave(0);
+          new StoragePath(basePath, partitionPath), Option.empty()).trySave();
     }
   }
 
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestTablePathUtils.java 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestTablePathUtils.java
index 2022ee8cfda..0db5c207463 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestTablePathUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestTablePathUtils.java
@@ -82,11 +82,11 @@ public final class TestTablePathUtils {
     HoodiePartitionMetadata partitionMetadata1 = new HoodiePartitionMetadata(
         storage, Instant.now().toString(), tablePath,
         partitionPath1, partitionMetafileFormat);
-    partitionMetadata1.trySave(1);
+    partitionMetadata1.trySave();
     HoodiePartitionMetadata partitionMetadata2 = new HoodiePartitionMetadata(
         storage, Instant.now().toString(), tablePath,
         partitionPath2, partitionMetafileFormat);
-    partitionMetadata2.trySave(2);
+    partitionMetadata2.trySave();
 
     // Create files
     URI filePathURI1 =
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
index f208bd0e3c6..cfdd6c88395 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
@@ -512,7 +512,7 @@ public class InputFormatTestUtil {
               new StoragePath(partitionPath.toAbsolutePath().toString()),
               Option.of(HoodieFileFormat.PARQUET));
 
-      partitionMetadata.trySave((int) (Math.random() * 1000));
+      partitionMetadata.trySave();
     }
   }
 
diff --git a/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java 
b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
index be160caba3b..b8735cc89d9 100644
--- a/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
+++ b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java
@@ -257,7 +257,7 @@ public abstract class HoodieStorage implements Closeable {
    */
   @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
   public final void createImmutableFileInPath(StoragePath path,
-                                              Option<byte[]> content) throws 
IOException {
+                                              Option<byte[]> content) throws 
HoodieIOException {
     OutputStream fsout = null;
     StoragePath tmpPath = null;
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairAddpartitionmetaProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairAddpartitionmetaProcedure.scala
index d13895af414..03ef6cc3f54 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairAddpartitionmetaProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairAddpartitionmetaProcedure.scala
@@ -70,7 +70,7 @@ class RepairAddpartitionmetaProcedure extends BaseProcedure 
with ProcedureBuilde
         if (!dryRun) {
           val partitionMetadata: HoodiePartitionMetadata = new 
HoodiePartitionMetadata(
             metaClient.getStorage, latestCommit, basePath, partitionPath, 
metaClient.getTableConfig.getPartitionMetafileFormat)
-          partitionMetadata.trySave(0)
+          partitionMetadata.trySave()
           action = "Repaired"
         }
       }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala
index 5651055ee99..07fd7c92a68 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala
@@ -78,7 +78,7 @@ class RepairMigratePartitionMetaProcedure extends 
BaseProcedure with ProcedureBu
         if (!baseFormatFile.isPresent) {
           val partitionMetadata: HoodiePartitionMetadata = new 
HoodiePartitionMetadata(metaClient.getStorage, latestCommit,
             basePath, partition, 
Option.of(metaClient.getTableConfig.getBaseFileFormat))
-          partitionMetadata.trySave(0)
+          partitionMetadata.trySave()
         }
         // delete it, in case we failed midway last time.
         textFormatFile.ifPresent(

Reply via email to