This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 07f271551153ebd6b19058d652ad26bf0b111215 Author: Sivabalan Narayanan <[email protected]> AuthorDate: Tue Apr 9 21:58:22 2024 -0700 [HUDI-7556] Fixing false positive validation with MDT validator (#10986) --- .../utilities/HoodieMetadataTableValidator.java | 96 +++++++++------ .../TestHoodieMetadataTableValidator.java | 131 ++++++++++++++++++++- 2 files changed, 187 insertions(+), 40 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java index bbe8610abe3..924132df2da 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java @@ -52,6 +52,7 @@ import org.apache.hudi.common.util.ConfigUtils; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ParquetUtils; +import org.apache.hudi.common.util.VisibleForTesting; import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; @@ -514,7 +515,9 @@ public class HoodieMetadataTableValidator implements Serializable { } HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); - List<String> allPartitions = validatePartitions(engineContext, basePath); + // compare partitions + + List<String> allPartitions = validatePartitions(engineContext, basePath, metaClient); if (allPartitions.isEmpty()) { LOG.warn("The result of getting all partitions is null or empty, skip current validation. {}", taskLabels); @@ -612,39 +615,14 @@ public class HoodieMetadataTableValidator implements Serializable { /** * Compare the listing partitions result between metadata table and fileSystem. */ - private List<String> validatePartitions(HoodieSparkEngineContext engineContext, String basePath) { + @VisibleForTesting + List<String> validatePartitions(HoodieSparkEngineContext engineContext, String basePath, HoodieTableMetaClient metaClient) { // compare partitions - List<String> allPartitionPathsFromFS = FSUtils.getAllPartitionPaths(engineContext, basePath, false, cfg.assumeDatePartitioning); HoodieTimeline completedTimeline = metaClient.getCommitsTimeline().filterCompletedInstants(); + List<String> allPartitionPathsFromFS = getPartitionsFromFileSystem(engineContext, basePath, metaClient.getFs(), + completedTimeline); - // ignore partitions created by uncommitted ingestion. - allPartitionPathsFromFS = allPartitionPathsFromFS.stream().parallel().filter(part -> { - HoodiePartitionMetadata hoodiePartitionMetadata = - new HoodiePartitionMetadata(metaClient.getFs(), FSUtils.getPartitionPath(basePath, part)); - - Option<String> instantOption = hoodiePartitionMetadata.readPartitionCreatedCommitTime(); - if (instantOption.isPresent()) { - String instantTime = instantOption.get(); - // There are two cases where the created commit time is written to the partition metadata: - // (1) Commit C1 creates the partition and C1 succeeds, the partition metadata has C1 as - // the created commit time. - // (2) Commit C1 creates the partition, the partition metadata is written, and C1 fails - // during writing data files. Next time, C2 adds new data to the same partition after C1 - // is rolled back. In this case, the partition metadata still has C1 as the created commit - // time, since Hudi does not rewrite the partition metadata in C2. - if (!completedTimeline.containsOrBeforeTimelineStarts(instantTime)) { - Option<HoodieInstant> lastInstant = completedTimeline.lastInstant(); - return lastInstant.isPresent() - && HoodieTimeline.compareTimestamps( - instantTime, LESSER_THAN_OR_EQUALS, lastInstant.get().getTimestamp()); - } - return true; - } else { - return false; - } - }).collect(Collectors.toList()); - - List<String> allPartitionPathsMeta = FSUtils.getAllPartitionPaths(engineContext, basePath, true, cfg.assumeDatePartitioning); + List<String> allPartitionPathsMeta = getPartitionsFromMDT(engineContext, basePath); Collections.sort(allPartitionPathsFromFS); Collections.sort(allPartitionPathsMeta); @@ -652,26 +630,23 @@ public class HoodieMetadataTableValidator implements Serializable { if (allPartitionPathsFromFS.size() != allPartitionPathsMeta.size() || !allPartitionPathsFromFS.equals(allPartitionPathsMeta)) { List<String> additionalFromFS = new ArrayList<>(allPartitionPathsFromFS); - additionalFromFS.remove(allPartitionPathsMeta); + additionalFromFS.removeAll(allPartitionPathsMeta); List<String> additionalFromMDT = new ArrayList<>(allPartitionPathsMeta); - additionalFromMDT.remove(allPartitionPathsFromFS); + additionalFromMDT.removeAll(allPartitionPathsFromFS); boolean misMatch = true; List<String> actualAdditionalPartitionsInMDT = new ArrayList<>(additionalFromMDT); if (additionalFromFS.isEmpty() && !additionalFromMDT.isEmpty()) { // there is a chance that when we polled MDT there could have been a new completed commit which was not complete when we polled FS based // listing. let's rule that out. additionalFromMDT.forEach(partitionFromDMT -> { - - HoodiePartitionMetadata hoodiePartitionMetadata = - new HoodiePartitionMetadata(metaClient.getFs(), FSUtils.getPartitionPath(basePath, partitionFromDMT)); - Option<String> partitionCreationTimeOpt = hoodiePartitionMetadata.readPartitionCreatedCommitTime(); + Option<String> partitionCreationTimeOpt = getPartitionCreationInstant(metaClient.getFs(), basePath, partitionFromDMT); // if creation time is greater than last completed instant in active timeline, we can ignore the additional partition from MDT. if (partitionCreationTimeOpt.isPresent() && !completedTimeline.containsInstant(partitionCreationTimeOpt.get())) { Option<HoodieInstant> lastInstant = completedTimeline.lastInstant(); if (lastInstant.isPresent() && HoodieTimeline.compareTimestamps(partitionCreationTimeOpt.get(), GREATER_THAN, lastInstant.get().getTimestamp())) { LOG.warn("Ignoring additional partition " + partitionFromDMT + ", as it was deduced to be part of a " - + "latest completed commit which was inflighht when FS based listing was polled."); + + "latest completed commit which was inflight when FS based listing was polled."); actualAdditionalPartitionsInMDT.remove(partitionFromDMT); } } @@ -689,10 +664,53 @@ public class HoodieMetadataTableValidator implements Serializable { throw new HoodieValidationException(message); } } - return allPartitionPathsMeta; } + @VisibleForTesting + Option<String> getPartitionCreationInstant(FileSystem fs, String basePath, String partition) { + HoodiePartitionMetadata hoodiePartitionMetadata = + new HoodiePartitionMetadata(fs, FSUtils.getPartitionPath(basePath, partition)); + return hoodiePartitionMetadata.readPartitionCreatedCommitTime(); + } + + @VisibleForTesting + List<String> getPartitionsFromMDT(HoodieEngineContext engineContext, String basePath) { + return FSUtils.getAllPartitionPaths(engineContext, basePath, true); + } + + @VisibleForTesting + List<String> getPartitionsFromFileSystem(HoodieEngineContext engineContext, String basePath, + FileSystem fs, HoodieTimeline completedTimeline) { + List<String> allPartitionPathsFromFS = FSUtils.getAllPartitionPaths(engineContext, basePath, false); + + // ignore partitions created by uncommitted ingestion. + return allPartitionPathsFromFS.stream().parallel().filter(part -> { + HoodiePartitionMetadata hoodiePartitionMetadata = + new HoodiePartitionMetadata(fs, FSUtils.getPartitionPath(basePath, part)); + Option<String> instantOption = hoodiePartitionMetadata.readPartitionCreatedCommitTime(); + if (instantOption.isPresent()) { + String instantTime = instantOption.get(); + // There are two cases where the created commit time is written to the partition metadata: + // (1) Commit C1 creates the partition and C1 succeeds, the partition metadata has C1 as + // the created commit time. + // (2) Commit C1 creates the partition, the partition metadata is written, and C1 fails + // during writing data files. Next time, C2 adds new data to the same partition after C1 + // is rolled back. In this case, the partition metadata still has C1 as the created commit + // time, since Hudi does not rewrite the partition metadata in C2. + if (!completedTimeline.containsOrBeforeTimelineStarts(instantTime)) { + Option<HoodieInstant> lastInstant = completedTimeline.lastInstant(); + return lastInstant.isPresent() + && HoodieTimeline.compareTimestamps( + instantTime, LESSER_THAN_OR_EQUALS, lastInstant.get().getTimestamp()); + } + return true; + } else { + return false; + } + }).collect(Collectors.toList()); + } + /** * Compare the file listing and index data between metadata table and fileSystem. * For now, validate five kinds of apis: diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java index e87f6257c54..131a96329f0 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java @@ -19,16 +19,32 @@ package org.apache.hudi.utilities; import org.apache.hudi.DataSourceWriteOptions; +import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.config.HoodieTimeGeneratorConfig; +import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimeGenerator; +import org.apache.hudi.common.table.timeline.TimeGenerators; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.exception.HoodieValidationException; import org.apache.hudi.testutils.HoodieSparkClientTestBase; +import org.apache.hadoop.fs.FileSystem; import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.io.IOException; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -36,14 +52,18 @@ import java.util.stream.Collectors; import static org.apache.hudi.common.testutils.RawTripTestPayload.recordToString; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; public class TestHoodieMetadataTableValidator extends HoodieSparkClientTestBase { @Test public void testMetadataTableValidation() { - Map<String,String> writeOptions = new HashMap<>(); + Map<String, String> writeOptions = new HashMap<>(); writeOptions.put(DataSourceWriteOptions.TABLE_NAME().key(), "test_table"); writeOptions.put("hoodie.table.name", "test_table"); writeOptions.put(DataSourceWriteOptions.TABLE_TYPE().key(), "MERGE_ON_READ"); @@ -73,6 +93,115 @@ public class TestHoodieMetadataTableValidator extends HoodieSparkClientTestBase assertTrue(validator.getThrowables().isEmpty()); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testAdditionalPartitionsinMDT(boolean testFailureCase) throws InterruptedException { + Map<String, String> writeOptions = new HashMap<>(); + writeOptions.put(DataSourceWriteOptions.TABLE_NAME().key(), "test_table"); + writeOptions.put("hoodie.table.name", "test_table"); + writeOptions.put(DataSourceWriteOptions.TABLE_TYPE().key(), "MERGE_ON_READ"); + writeOptions.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "_row_key"); + writeOptions.put(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), "timestamp"); + writeOptions.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "partition_path"); + + // constructor of HoodieMetadataValidator instantiates HoodieTableMetaClient. hence creating an actual table. but rest of tests is mocked. + Dataset<Row> inserts = makeInsertDf("000", 5).cache(); + inserts.write().format("hudi").options(writeOptions) + .option(DataSourceWriteOptions.OPERATION().key(), WriteOperationType.BULK_INSERT.value()) + .mode(SaveMode.Overwrite) + .save(basePath); + + HoodieMetadataTableValidator.Config config = new HoodieMetadataTableValidator.Config(); + config.basePath = basePath; + config.validateLatestFileSlices = true; + config.validateAllFileGroups = true; + MockHoodieMetadataTableValidator validator = new MockHoodieMetadataTableValidator(jsc, config); + HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc); + HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class); + + String partition1 = "PARTITION1"; + String partition2 = "PARTITION2"; + String partition3 = "PARTITION3"; + + // mock list of partitions to return from MDT to have 1 additional partition compared to FS based listing. + List<String> mdtPartitions = Arrays.asList(partition1, partition2, partition3); + validator.setMetadataPartitionsToReturn(mdtPartitions); + List<String> fsPartitions = Arrays.asList(partition1, partition2); + validator.setFsPartitionsToReturn(fsPartitions); + + // mock completed timeline. + HoodieTimeline commitsTimeline = mock(HoodieTimeline.class); + HoodieTimeline completedTimeline = mock(HoodieTimeline.class); + when(metaClient.getCommitsTimeline()).thenReturn(commitsTimeline); + when(commitsTimeline.filterCompletedInstants()).thenReturn(completedTimeline); + + TimeGenerator timeGenerator = TimeGenerators + .getTimeGenerator(HoodieTimeGeneratorConfig.defaultConfig(basePath), jsc.hadoopConfiguration()); + + if (testFailureCase) { + // 3rd partition which is additional in MDT should have creation time before last instant in timeline. + + String partition3CreationTime = HoodieActiveTimeline.createNewInstantTime(true, timeGenerator); + Thread.sleep(100); + String lastIntantCreationTime = HoodieActiveTimeline.createNewInstantTime(true, timeGenerator); + + HoodieInstant lastInstant = new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, lastIntantCreationTime); + when(completedTimeline.lastInstant()).thenReturn(Option.of(lastInstant)); + validator.setPartitionCreationTime(Option.of(partition3CreationTime)); + // validate that exception is thrown since MDT has one additional partition. + assertThrows(HoodieValidationException.class, () -> { + validator.validatePartitions(engineContext, basePath, metaClient); + }); + } else { + // 3rd partition creation time is > last completed instant + HoodieInstant lastInstant = new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, HoodieActiveTimeline.createNewInstantTime(true, timeGenerator)); + when(completedTimeline.lastInstant()).thenReturn(Option.of(lastInstant)); + Thread.sleep(100); + validator.setPartitionCreationTime(Option.of(HoodieActiveTimeline.createNewInstantTime(true, timeGenerator))); + + // validate that all 3 partitions are returned + assertEquals(mdtPartitions, validator.validatePartitions(engineContext, basePath, metaClient)); + } + } + + class MockHoodieMetadataTableValidator extends HoodieMetadataTableValidator { + + private List<String> metadataPartitionsToReturn; + private List<String> fsPartitionsToReturn; + private Option<String> partitionCreationTime; + + public MockHoodieMetadataTableValidator(JavaSparkContext jsc, Config cfg) { + super(jsc, cfg); + } + + void setMetadataPartitionsToReturn(List<String> metadataPartitionsToReturn) { + this.metadataPartitionsToReturn = metadataPartitionsToReturn; + } + + void setFsPartitionsToReturn(List<String> fsPartitionsToReturn) { + this.fsPartitionsToReturn = fsPartitionsToReturn; + } + + void setPartitionCreationTime(Option<String> partitionCreationTime) { + this.partitionCreationTime = partitionCreationTime; + } + + @Override + List<String> getPartitionsFromFileSystem(HoodieEngineContext engineContext, String basePath, FileSystem fs, HoodieTimeline completedTimeline) { + return fsPartitionsToReturn; + } + + @Override + List<String> getPartitionsFromMDT(HoodieEngineContext engineContext, String basePath) { + return metadataPartitionsToReturn; + } + + @Override + Option<String> getPartitionCreationInstant(FileSystem fs, String basePath, String partition) { + return this.partitionCreationTime; + } + } + protected Dataset<Row> makeInsertDf(String instantTime, Integer n) { List<String> records = dataGen.generateInserts(instantTime, n).stream() .map(r -> recordToString(r).get()).collect(Collectors.toList());
