This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit deb90df6409723c052aa11fc2208934fc2e10519 Author: Wechar Yu <[email protected]> AuthorDate: Sat Apr 20 08:18:18 2024 +0800 [HUDI-7515] Fix partition metadata write failure (#10886) --- .../apache/hudi/cli/commands/RepairsCommand.java | 4 +- .../org/apache/hudi/io/HoodieAppendHandle.java | 2 +- .../org/apache/hudi/io/HoodieCreateHandle.java | 2 +- .../java/org/apache/hudi/io/HoodieMergeHandle.java | 2 +- .../io/storage/row/HoodieRowDataCreateHandle.java | 2 +- .../hudi/io/storage/row/HoodieRowCreateHandle.java | 2 +- .../hudi/common/model/HoodiePartitionMetadata.java | 80 ++++++++++++---------- .../table/timeline/HoodieActiveTimeline.java | 6 +- .../common/model/TestHoodiePartitionMetadata.java | 2 +- .../common/testutils/HoodieTestDataGenerator.java | 5 +- .../hudi/common/util/TestTablePathUtils.java | 4 +- .../hudi/hadoop/testutils/InputFormatTestUtil.java | 2 +- .../org/apache/hudi/storage/HoodieStorage.java | 2 +- .../RepairAddpartitionmetaProcedure.scala | 2 +- .../RepairMigratePartitionMetaProcedure.scala | 2 +- 15 files changed, 60 insertions(+), 59 deletions(-) diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java index a41e57a0bb2..28e1a0d39ba 100644 --- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java +++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/RepairsCommand.java @@ -139,7 +139,7 @@ public class RepairsCommand { HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(HoodieCLI.storage, latestCommit, basePath, partitionPath, client.getTableConfig().getPartitionMetafileFormat()); - partitionMetadata.trySave(0); + partitionMetadata.trySave(); row[2] = "Repaired"; } } @@ -256,7 +256,7 @@ public class RepairsCommand { HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(HoodieCLI.storage, latestCommit, basePath, partition, Option.of(client.getTableConfig().getBaseFileFormat())); - partitionMetadata.trySave(0); + partitionMetadata.trySave(); } // delete it, in case we failed midway last time. diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java index 40613e15b1f..2bac318fc81 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java @@ -208,7 +208,7 @@ public class HoodieAppendHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O HoodiePartitionMetadata partitionMetadata = new HoodiePartitionMetadata(storage, baseInstantTime, new StoragePath(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath), hoodieTable.getPartitionMetafileFormat()); - partitionMetadata.trySave(getPartitionId()); + partitionMetadata.trySave(); this.writer = createLogWriter(fileSlice, baseInstantTime); } catch (Exception e) { LOG.error("Error in update task at commit " + instantTime, e); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java index 6f3824ac34c..0ad4e212a1a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCreateHandle.java @@ -100,7 +100,7 @@ public class HoodieCreateHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O new StoragePath(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath), hoodieTable.getPartitionMetafileFormat()); - partitionMetadata.trySave(getPartitionId()); + partitionMetadata.trySave(); createMarkerFile(partitionPath, FSUtils.makeBaseFileName(this.instantTime, this.writeToken, this.fileId, hoodieTable.getBaseFileExtension())); this.fileWriter = diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java index 55aa334a97a..afae82fd13f 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieMergeHandle.java @@ -176,7 +176,7 @@ public class HoodieMergeHandle<T, I, K, O> extends HoodieWriteHandle<T, I, K, O> new StoragePath(config.getBasePath()), FSUtils.getPartitionPath(config.getBasePath(), partitionPath), hoodieTable.getPartitionMetafileFormat()); - partitionMetadata.trySave(getPartitionId()); + partitionMetadata.trySave(); String newFileName = FSUtils.makeBaseFileName(instantTime, writeToken, fileId, hoodieTable.getBaseFileExtension()); makeOldAndNewFilePaths(partitionPath, latestValidFilePath, newFileName); diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java index 56e38dc8ddf..19455773153 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowDataCreateHandle.java @@ -106,7 +106,7 @@ public class HoodieRowDataCreateHandle implements Serializable { new StoragePath(writeConfig.getBasePath()), FSUtils.getPartitionPath(writeConfig.getBasePath(), partitionPath), table.getPartitionMetafileFormat()); - partitionMetadata.trySave(taskPartitionId); + partitionMetadata.trySave(); createMarkerFile(partitionPath, FSUtils.makeBaseFileName(this.instantTime, getWriteToken(), this.fileId, table.getBaseFileExtension())); this.fileWriter = createNewFileWriter(path, table, writeConfig, rowType); } catch (IOException e) { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java index 98341bf62b4..890b12899f1 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowCreateHandle.java @@ -143,7 +143,7 @@ public class HoodieRowCreateHandle implements Serializable { new StoragePath(writeConfig.getBasePath()), FSUtils.getPartitionPath(writeConfig.getBasePath(), partitionPath), table.getPartitionMetafileFormat()); - partitionMetadata.trySave(taskPartitionId); + partitionMetadata.trySave(); createMarkerFile(partitionPath, fileName, instantTime, table, writeConfig); diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java index adeaaa5be4f..a90d05aefdd 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodiePartitionMetadata.java @@ -20,6 +20,7 @@ package org.apache.hudi.common.model; import org.apache.hudi.common.util.BaseFileUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.RetryHelper; import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; @@ -30,12 +31,13 @@ import org.apache.hadoop.conf.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.UUID; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -94,36 +96,29 @@ public class HoodiePartitionMetadata { /** * Write the metadata safely into partition atomically. */ - public void trySave(int taskPartitionId) { - String extension = getMetafileExtension(); - StoragePath tmpMetaPath = - new StoragePath(partitionPath, HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX + "_" + taskPartitionId + extension); - StoragePath metaPath = new StoragePath(partitionPath, HoodiePartitionMetadata.HOODIE_PARTITION_METAFILE_PREFIX + extension); - boolean metafileExists = false; + public void trySave() throws HoodieIOException { + StoragePath metaPath = new StoragePath( + partitionPath, HOODIE_PARTITION_METAFILE_PREFIX + getMetafileExtension()); - try { - metafileExists = storage.exists(metaPath); - if (!metafileExists) { - // write to temporary file - writeMetafile(tmpMetaPath); - // move to actual path - storage.rename(tmpMetaPath, metaPath); - } - } catch (IOException ioe) { - LOG.warn("Error trying to save partition metadata (this is okay, as long as at least 1 of these succeeded), " - + partitionPath, ioe); - } finally { - if (!metafileExists) { - try { - // clean up tmp file, if still lying around - if (storage.exists(tmpMetaPath)) { - storage.deleteFile(tmpMetaPath); + // This retry mechanism enables an exit-fast in metaPath exists check, which avoid the + // tasks failures when there are two or more tasks trying to create the same metaPath. + RetryHelper<Void, HoodieIOException> retryHelper = new RetryHelper(1000, 3, 1000, HoodieIOException.class.getName()) + .tryWith(() -> { + if (!storage.exists(metaPath)) { + if (format.isPresent()) { + writeMetafileInFormat(metaPath, format.get()); + } else { + // Backwards compatible properties file format + try (ByteArrayOutputStream os = new ByteArrayOutputStream()) { + props.store(os, "partition metadata"); + Option content = Option.of(os.toByteArray()); + storage.createImmutableFileInPath(metaPath, content); + } + } } - } catch (IOException ioe) { - LOG.warn("Error trying to clean up temporary files for " + partitionPath, ioe); - } - } - } + return null; + }); + retryHelper.start(); } private String getMetafileExtension() { @@ -134,17 +129,26 @@ public class HoodiePartitionMetadata { /** * Write the partition metadata in the correct format in the given file path. * - * @param filePath path of the file to write. + * @param filePath Path of the file to write + * @param format Hoodie table file format * @throws IOException */ - private void writeMetafile(StoragePath filePath) throws IOException { - if (format.isPresent()) { - BaseFileUtils.getInstance(format.get()).writeMetaFile(storage, filePath, props); - } else { - // Backwards compatible properties file format - try (OutputStream os = storage.create(filePath, true)) { - props.store(os, "partition metadata"); - os.flush(); + private void writeMetafileInFormat(StoragePath filePath, HoodieFileFormat format) throws IOException { + StoragePath tmpPath = new StoragePath(partitionPath, + HOODIE_PARTITION_METAFILE_PREFIX + "_" + UUID.randomUUID() + getMetafileExtension()); + try { + // write to temporary file + BaseFileUtils.getInstance(format).writeMetaFile(storage, tmpPath, props); + // move to actual path + storage.rename(tmpPath, filePath); + } finally { + try { + // clean up tmp file, if still lying around + if (storage.exists(tmpPath)) { + storage.deleteFile(tmpPath); + } + } catch (IOException ioe) { + LOG.warn("Error trying to clean up temporary files for " + partitionPath, ioe); } } } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java index 0545fe392fc..3c8d6aa4306 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieActiveTimeline.java @@ -796,11 +796,7 @@ public class HoodieActiveTimeline extends HoodieDefaultTimeline { if (allowOverwrite || metaClient.getTimelineLayoutVersion().isNullVersion()) { FileIOUtils.createFileInPath(metaClient.getStorage(), fullPath, content); } else { - try { - metaClient.getStorage().createImmutableFileInPath(fullPath, content); - } catch (IOException e) { - throw new HoodieIOException("Cannot create immutable file: " + fullPath, e); - } + metaClient.getStorage().createImmutableFileInPath(fullPath, content); } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodiePartitionMetadata.java b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodiePartitionMetadata.java index 70474ec833f..ef01aa7deed 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodiePartitionMetadata.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/model/TestHoodiePartitionMetadata.java @@ -77,7 +77,7 @@ public class TestHoodiePartitionMetadata extends HoodieCommonTestHarness { HoodiePartitionMetadata writtenMetadata = new HoodiePartitionMetadata( metaClient.getStorage(), commitTime, new StoragePath(basePath), partitionPath, format); - writtenMetadata.trySave(0); + writtenMetadata.trySave(); // when HoodiePartitionMetadata readMetadata = new HoodiePartitionMetadata( diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java index 9cb2ab3bfb7..a7440f8993a 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java @@ -253,7 +253,8 @@ public class HoodieTestDataGenerator implements AutoCloseable { * @deprecated please use non-static version */ public static void writePartitionMetadataDeprecated(HoodieStorage storage, - String[] partitionPaths, String basePath) { + String[] partitionPaths, + String basePath) { new HoodieTestDataGenerator().writePartitionMetadata(storage, partitionPaths, basePath); } @@ -268,7 +269,7 @@ public class HoodieTestDataGenerator implements AutoCloseable { String basePath) { for (String partitionPath : partitionPaths) { new HoodiePartitionMetadata(storage, "000", new StoragePath(basePath), - new StoragePath(basePath, partitionPath), Option.empty()).trySave(0); + new StoragePath(basePath, partitionPath), Option.empty()).trySave(); } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestTablePathUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestTablePathUtils.java index 2022ee8cfda..0db5c207463 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestTablePathUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestTablePathUtils.java @@ -82,11 +82,11 @@ public final class TestTablePathUtils { HoodiePartitionMetadata partitionMetadata1 = new HoodiePartitionMetadata( storage, Instant.now().toString(), tablePath, partitionPath1, partitionMetafileFormat); - partitionMetadata1.trySave(1); + partitionMetadata1.trySave(); HoodiePartitionMetadata partitionMetadata2 = new HoodiePartitionMetadata( storage, Instant.now().toString(), tablePath, partitionPath2, partitionMetafileFormat); - partitionMetadata2.trySave(2); + partitionMetadata2.trySave(); // Create files URI filePathURI1 = diff --git a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java index f208bd0e3c6..cfdd6c88395 100644 --- a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java +++ b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java @@ -512,7 +512,7 @@ public class InputFormatTestUtil { new StoragePath(partitionPath.toAbsolutePath().toString()), Option.of(HoodieFileFormat.PARQUET)); - partitionMetadata.trySave((int) (Math.random() * 1000)); + partitionMetadata.trySave(); } } diff --git a/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java index be160caba3b..b8735cc89d9 100644 --- a/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java +++ b/hudi-io/src/main/java/org/apache/hudi/storage/HoodieStorage.java @@ -257,7 +257,7 @@ public abstract class HoodieStorage implements Closeable { */ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING) public final void createImmutableFileInPath(StoragePath path, - Option<byte[]> content) throws IOException { + Option<byte[]> content) throws HoodieIOException { OutputStream fsout = null; StoragePath tmpPath = null; diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairAddpartitionmetaProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairAddpartitionmetaProcedure.scala index d13895af414..03ef6cc3f54 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairAddpartitionmetaProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairAddpartitionmetaProcedure.scala @@ -70,7 +70,7 @@ class RepairAddpartitionmetaProcedure extends BaseProcedure with ProcedureBuilde if (!dryRun) { val partitionMetadata: HoodiePartitionMetadata = new HoodiePartitionMetadata( metaClient.getStorage, latestCommit, basePath, partitionPath, metaClient.getTableConfig.getPartitionMetafileFormat) - partitionMetadata.trySave(0) + partitionMetadata.trySave() action = "Repaired" } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala index 5651055ee99..07fd7c92a68 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/RepairMigratePartitionMetaProcedure.scala @@ -78,7 +78,7 @@ class RepairMigratePartitionMetaProcedure extends BaseProcedure with ProcedureBu if (!baseFormatFile.isPresent) { val partitionMetadata: HoodiePartitionMetadata = new HoodiePartitionMetadata(metaClient.getStorage, latestCommit, basePath, partition, Option.of(metaClient.getTableConfig.getBaseFileFormat)) - partitionMetadata.trySave(0) + partitionMetadata.trySave() } // delete it, in case we failed midway last time. textFormatFile.ifPresent(
