This is an automated email from the ASF dual-hosted git repository. danny0405 pushed a commit to branch release-0.10.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit e131507d4672316b6a0f2b2f3f63d8ad34e49b85 Author: Sivabalan Narayanan <[email protected]> AuthorDate: Sat Dec 4 00:44:50 2021 -0500 [HUDI-2923] Fixing metadata table reader when metadata compaction is inflight (#4206) * [HUDI-2923] Fixing metadata table reader when metadata compaction is inflight * Fixing retry of pending compaction in metadata table and enhancing tests (cherry picked from commit 1d4fb827e73b2ae510f0e2c6510a448c0b5bd5b3) --- .../hudi/client/AbstractHoodieWriteClient.java | 7 ++ .../metadata/HoodieBackedTableMetadataWriter.java | 6 +- .../FlinkHoodieBackedTableMetadataWriter.java | 2 +- .../SparkHoodieBackedTableMetadataWriter.java | 2 +- .../functional/TestHoodieBackedMetadata.java | 86 ++++++++++++++++++++++ .../hudi/metadata/HoodieBackedTableMetadata.java | 2 +- .../hudi/metadata/HoodieTableMetadataUtil.java | 8 +- .../hudi/common/testutils/FileCreateUtils.java | 10 +++ 8 files changed, 117 insertions(+), 6 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java index 59acbb2..5c2bee1 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java @@ -520,6 +520,13 @@ public abstract class AbstractHoodieWriteClient<T extends HoodieRecordPayload, I } /** + * Run any pending compactions. + */ + public void runAnyPendingCompactions() { + runAnyPendingCompactions(createTable(config, hadoopConf, config.isMetadataTableEnabled())); + } + + /** * Create a savepoint based on the latest commit action on the timeline. * * @param user - User creating the savepoint diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 54284fc..eb4b24a 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -682,7 +682,10 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta * deltacommit. */ protected void compactIfNecessary(AbstractHoodieWriteClient writeClient, String instantTime) { - String latestDeltacommitTime = metadataMetaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant() + // finish off any pending compactions if any from previous attempt. + writeClient.runAnyPendingCompactions(); + + String latestDeltacommitTime = metadataMetaClient.reloadActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants().lastInstant() .get().getTimestamp(); List<HoodieInstant> pendingInstants = dataMetaClient.reloadActiveTimeline().filterInflightsAndRequested() .findInstantsBefore(latestDeltacommitTime).getInstants().collect(Collectors.toList()); @@ -693,6 +696,7 @@ public abstract class HoodieBackedTableMetadataWriter implements HoodieTableMeta return; } + // Trigger compaction with suffixes based on the same instant time. This ensures that any future // delta commits synced over will not have an instant time lesser than the last completed instant on the // metadata table. diff --git a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java index 0dcfcfc..d11f570 100644 --- a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/metadata/FlinkHoodieBackedTableMetadataWriter.java @@ -154,7 +154,7 @@ public class FlinkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad * The record is tagged with respective file slice's location based on its record key. */ private List<HoodieRecord> prepRecords(List<HoodieRecord> records, String partitionName, int numFileGroups) { - List<FileSlice> fileSlices = HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient, partitionName); + List<FileSlice> fileSlices = HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient, partitionName, false); ValidationUtils.checkArgument(fileSlices.size() == numFileGroups, String.format("Invalid number of file groups: found=%d, required=%d", fileSlices.size(), numFileGroups)); return records.stream().map(r -> { diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java index 65ade82..b7b5961 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java @@ -169,7 +169,7 @@ public class SparkHoodieBackedTableMetadataWriter extends HoodieBackedTableMetad * The record is tagged with respective file slice's location based on its record key. */ private JavaRDD<HoodieRecord> prepRecords(JavaRDD<HoodieRecord> recordsRDD, String partitionName, int numFileGroups) { - List<FileSlice> fileSlices = HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient, partitionName); + List<FileSlice> fileSlices = HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient, partitionName, false); ValidationUtils.checkArgument(fileSlices.size() == numFileGroups, String.format("Invalid number of file groups: found=%d, required=%d", fileSlices.size(), numFileGroups)); return recordsRDD.map(r -> { diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java index ed245da..0339c47 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java @@ -54,6 +54,7 @@ import org.apache.hudi.common.table.view.FileSystemViewStorageType; import org.apache.hudi.common.table.view.HoodieTableFileSystemView; import org.apache.hudi.common.table.view.TableFileSystemView; import org.apache.hudi.common.testutils.FileCreateUtils; +import org.apache.hudi.common.testutils.HoodieMetadataTestTable; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.common.util.HoodieTimer; @@ -413,6 +414,91 @@ public class TestHoodieBackedMetadata extends TestHoodieMetadataBase { }); } + + /** + * Tests that virtual key configs are honored in base files after compaction in metadata table. + * + */ + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testMetadataTableWithPendingCompaction(boolean simulateFailedCompaction) throws Exception { + HoodieTableType tableType = COPY_ON_WRITE; + init(tableType, false); + writeConfig = getWriteConfigBuilder(true, true, false) + .withMetadataConfig(HoodieMetadataConfig.newBuilder() + .enable(true) + .enableFullScan(true) + .enableMetrics(false) + .withMaxNumDeltaCommitsBeforeCompaction(3) + .build()).build(); + initWriteConfigAndMetatableWriter(writeConfig, true); + + doWriteOperation(testTable, "0000001", INSERT); + // create an inflight compaction in metadata table. + // not easy to create an inflight in metadata table directly, hence letting compaction succeed and then deleting the completed instant. + // this new write is expected to trigger metadata table compaction + String commitInstant = "0000002"; + doWriteOperation(testTable, commitInstant, INSERT); + + HoodieTableMetadata tableMetadata = metadata(writeConfig, context); + String metadataCompactionInstant = commitInstant + "001"; + assertTrue(tableMetadata.getLatestCompactionTime().isPresent()); + assertEquals(tableMetadata.getLatestCompactionTime().get(), metadataCompactionInstant); + + validateMetadata(testTable); + // Fetch compaction Commit file and rename to some other file. completed compaction meta file should have some serialized info that table interprets + // for future upserts. so, renaming the file here to some temp name and later renaming it back to same name. + java.nio.file.Path parentPath = Paths.get(metadataTableBasePath, HoodieTableMetaClient.METAFOLDER_NAME); + java.nio.file.Path metaFilePath = parentPath.resolve(metadataCompactionInstant + HoodieTimeline.COMMIT_EXTENSION); + java.nio.file.Path tempFilePath = FileCreateUtils.renameFileToTemp(metaFilePath, metadataCompactionInstant); + metaClient.reloadActiveTimeline(); + testTable = HoodieMetadataTestTable.of(metaClient, metadataWriter); + // this validation will exercise the code path where a compaction is inflight in metadata table, but still metadata based file listing should match non + // metadata based file listing. + validateMetadata(testTable); + + if (simulateFailedCompaction) { + // this should retry the compaction in metadata table. + doWriteOperation(testTable, "0000003", INSERT); + } else { + // let the compaction succeed in metadata and validation should succeed. + FileCreateUtils.renameTempToMetaFile(tempFilePath, metaFilePath); + } + + validateMetadata(testTable); + + // add few more write and validate + doWriteOperation(testTable, "0000004", INSERT); + doWriteOperation(testTable, "0000005", UPSERT); + validateMetadata(testTable); + + if (simulateFailedCompaction) { + //trigger another compaction failure. + metadataCompactionInstant = "0000005001"; + tableMetadata = metadata(writeConfig, context); + assertTrue(tableMetadata.getLatestCompactionTime().isPresent()); + assertEquals(tableMetadata.getLatestCompactionTime().get(), metadataCompactionInstant); + + // Fetch compaction Commit file and rename to some other file. completed compaction meta file should have some serialized info that table interprets + // for future upserts. so, renaming the file here to some temp name and later renaming it back to same name. + parentPath = Paths.get(metadataTableBasePath, HoodieTableMetaClient.METAFOLDER_NAME); + metaFilePath = parentPath.resolve(metadataCompactionInstant + HoodieTimeline.COMMIT_EXTENSION); + tempFilePath = FileCreateUtils.renameFileToTemp(metaFilePath, metadataCompactionInstant); + + validateMetadata(testTable); + + // this should retry the failed compaction in metadata table. + doWriteOperation(testTable, "0000006", INSERT); + + validateMetadata(testTable); + + // add few more write and validate + doWriteOperation(testTable, "0000007", INSERT); + doWriteOperation(testTable, "0000008", UPSERT); + validateMetadata(testTable); + } + } + /** * Test rollback of various table operations sync to Metadata Table correctly. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java index 8a9f855..58c25a1 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java @@ -245,7 +245,7 @@ public class HoodieBackedTableMetadata extends BaseTableMetadata { // Metadata is in sync till the latest completed instant on the dataset HoodieTimer timer = new HoodieTimer().startTimer(); - List<FileSlice> latestFileSlices = HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient, partitionName); + List<FileSlice> latestFileSlices = HoodieTableMetadataUtil.loadPartitionFileGroupsWithLatestFileSlices(metadataMetaClient, partitionName, true); if (latestFileSlices.size() == 0) { // empty partition return Pair.of(null, null); diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 9078bd0..b4dfbbd 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -50,6 +50,7 @@ import java.util.List; import java.util.Map; import java.util.function.BiFunction; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME; @@ -338,9 +339,10 @@ public class HoodieTableMetadataUtil { * The list of file slices returned is sorted in the correct order of file group name. * @param metaClient instance of {@link HoodieTableMetaClient}. * @param partition The name of the partition whose file groups are to be loaded. + * @param isReader true if reader code path, false otherwise. * @return List of latest file slices for all file groups in a given partition. */ - public static List<FileSlice> loadPartitionFileGroupsWithLatestFileSlices(HoodieTableMetaClient metaClient, String partition) { + public static List<FileSlice> loadPartitionFileGroupsWithLatestFileSlices(HoodieTableMetaClient metaClient, String partition, boolean isReader) { LOG.info("Loading file groups for metadata table partition " + partition); // If there are no commits on the metadata table then the table's default FileSystemView will not return any file @@ -352,7 +354,9 @@ public class HoodieTableMetadataUtil { } HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(metaClient, timeline); - return fsView.getLatestFileSlices(partition).sorted((s1, s2) -> s1.getFileId().compareTo(s2.getFileId())) + Stream<FileSlice> fileSliceStream = isReader ? fsView.getLatestMergedFileSlicesBeforeOrOn(partition, timeline.filterCompletedInstants().lastInstant().get().getTimestamp()) : + fsView.getLatestFileSlices(partition); + return fileSliceStream.sorted((s1, s2) -> s1.getFileId().compareTo(s2.getFileId())) .collect(Collectors.toList()); } } diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java index 15215b8..486c473 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java @@ -344,6 +344,16 @@ public class FileCreateUtils { removeMetaFile(basePath, instantTime, HoodieTimeline.ROLLBACK_EXTENSION); } + public static java.nio.file.Path renameFileToTemp(java.nio.file.Path sourcePath, String instantTime) throws IOException { + java.nio.file.Path dummyFilePath = sourcePath.getParent().resolve(instantTime + ".temp"); + Files.move(sourcePath, dummyFilePath); + return dummyFilePath; + } + + public static void renameTempToMetaFile(java.nio.file.Path tempFilePath, java.nio.file.Path destPath) throws IOException { + Files.move(tempFilePath, destPath); + } + public static long getTotalMarkerFileCount(String basePath, String partitionPath, String instantTime, IOType ioType) throws IOException { Path parentPath = Paths.get(basePath, HoodieTableMetaClient.TEMPFOLDER_NAME, instantTime, partitionPath); if (Files.notExists(parentPath)) {
