This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit ed0462b49ada72f1c30ef3fb0bba09f14c94f231 Author: Sivabalan Narayanan <[email protected]> AuthorDate: Wed May 15 05:44:06 2024 -0700 [HUDI-7673] Fixing false positive validation failure for RLI with MDT validation tool (#11098) --- .../utilities/HoodieMetadataTableValidator.java | 118 ++++++++++++++------- .../TestHoodieMetadataTableValidator.java | 118 ++++++++++++++++++++- 2 files changed, 195 insertions(+), 41 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java index 62a42e56964..0ec37e4a8fa 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java @@ -18,6 +18,7 @@ package org.apache.hudi.utilities; +import org.apache.hudi.DataSourceReadOptions; import org.apache.hudi.async.HoodieAsyncService; import org.apache.hudi.avro.model.HoodieCleanerPlan; import org.apache.hudi.client.common.HoodieSparkEngineContext; @@ -37,7 +38,6 @@ import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieFileGroup; import org.apache.hudi.common.model.HoodieLogFile; import org.apache.hudi.common.model.HoodiePartitionMetadata; -import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType; import org.apache.hudi.common.model.HoodieRecordGlobalLocation; import org.apache.hudi.common.model.HoodieWriteStat; @@ -67,6 +67,7 @@ import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.metadata.HoodieTableMetadataUtil; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.metadata.MetadataPartitionType; import org.apache.hudi.utilities.util.BloomFilterData; import com.beust.jcommander.JCommander; @@ -77,6 +78,7 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.Optional; +import org.apache.spark.sql.Row; import org.apache.spark.sql.functions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -101,6 +103,10 @@ import java.util.stream.Collectors; import scala.Tuple2; +import static org.apache.hudi.common.model.HoodieRecord.FILENAME_METADATA_FIELD; +import static org.apache.hudi.common.model.HoodieRecord.PARTITION_PATH_METADATA_FIELD; +import static org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIELD; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN; import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes; import static org.apache.hudi.metadata.HoodieTableMetadata.getMetadataTableBasePath; @@ -540,7 +546,7 @@ public class HoodieMetadataTableValidator implements Serializable { }).collectAsList()); try { - validateRecordIndex(engineContext, metaClient, metadataTableBasedContext.getTableMetadata()); + validateRecordIndex(engineContext, metaClient); result.add(Pair.of(true, null)); } catch (HoodieValidationException e) { LOG.error( @@ -638,7 +644,7 @@ public class HoodieMetadataTableValidator implements Serializable { if (partitionCreationTimeOpt.isPresent() && !completedTimeline.containsInstant(partitionCreationTimeOpt.get())) { Option<HoodieInstant> lastInstant = completedTimeline.lastInstant(); if (lastInstant.isPresent() - && HoodieTimeline.compareTimestamps(partitionCreationTimeOpt.get(), HoodieTimeline.GREATER_THAN, lastInstant.get().getTimestamp())) { + && HoodieTimeline.compareTimestamps(partitionCreationTimeOpt.get(), GREATER_THAN, lastInstant.get().getTimestamp())) { LOG.warn("Ignoring additional partition {}, as it was deduced to be part of a " + "latest completed commit which was inflight when FS based listing was polled.", partitionFromDMT); actualAdditionalPartitionsInMDT.remove(partitionFromDMT); @@ -886,10 +892,12 @@ public class HoodieMetadataTableValidator implements Serializable { } private void validateRecordIndex(HoodieSparkEngineContext sparkEngineContext, - HoodieTableMetaClient metaClient, - HoodieTableMetadata tableMetadata) { + HoodieTableMetaClient metaClient) { + if (!metaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.RECORD_INDEX)) { + return; + } if (cfg.validateRecordIndexContent) { - validateRecordIndexContent(sparkEngineContext, metaClient, tableMetadata); + validateRecordIndexContent(sparkEngineContext, metaClient); } else if (cfg.validateRecordIndexCount) { validateRecordIndexCount(sparkEngineContext, metaClient); } @@ -898,11 +906,15 @@ public class HoodieMetadataTableValidator implements Serializable { private void validateRecordIndexCount(HoodieSparkEngineContext sparkEngineContext, HoodieTableMetaClient metaClient) { String basePath = metaClient.getBasePathV2().toString(); + String latestCompletedCommit = metaClient.getActiveTimeline().getCommitsAndCompactionTimeline() + .filterCompletedInstants().lastInstant().get().getTimestamp(); long countKeyFromTable = sparkEngineContext.getSqlContext().read().format("hudi") + .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT().key(),latestCompletedCommit) .load(basePath) - .select(HoodieRecord.RECORD_KEY_METADATA_FIELD) + .select(RECORD_KEY_METADATA_FIELD) .count(); long countKeyFromRecordIndex = sparkEngineContext.getSqlContext().read().format("hudi") + .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT().key(),latestCompletedCommit) .load(getMetadataTableBasePath(basePath)) .select("key") .filter("type = 5") @@ -919,43 +931,15 @@ public class HoodieMetadataTableValidator implements Serializable { } private void validateRecordIndexContent(HoodieSparkEngineContext sparkEngineContext, - HoodieTableMetaClient metaClient, - HoodieTableMetadata tableMetadata) { + HoodieTableMetaClient metaClient) { String basePath = metaClient.getBasePathV2().toString(); + String latestCompletedCommit = metaClient.getActiveTimeline().getCommitsAndCompactionTimeline() + .filterCompletedInstants().lastInstant().get().getTimestamp(); JavaPairRDD<String, Pair<String, String>> keyToLocationOnFsRdd = - sparkEngineContext.getSqlContext().read().format("hudi").load(basePath) - .select(HoodieRecord.RECORD_KEY_METADATA_FIELD, HoodieRecord.PARTITION_PATH_METADATA_FIELD, HoodieRecord.FILENAME_METADATA_FIELD) - .toJavaRDD() - .mapToPair(row -> new Tuple2<>(row.getString(row.fieldIndex(HoodieRecord.RECORD_KEY_METADATA_FIELD)), - Pair.of(row.getString(row.fieldIndex(HoodieRecord.PARTITION_PATH_METADATA_FIELD)), - FSUtils.getFileId(row.getString(row.fieldIndex(HoodieRecord.FILENAME_METADATA_FIELD)))))) - .cache(); + getRecordLocationsFromFSBasedListing(sparkEngineContext, basePath, latestCompletedCommit); JavaPairRDD<String, Pair<String, String>> keyToLocationFromRecordIndexRdd = - sparkEngineContext.getSqlContext().read().format("hudi") - .load(getMetadataTableBasePath(basePath)) - .filter("type = 5") - .select(functions.col("key"), - functions.col("recordIndexMetadata.partitionName").as("partitionName"), - functions.col("recordIndexMetadata.fileIdHighBits").as("fileIdHighBits"), - functions.col("recordIndexMetadata.fileIdLowBits").as("fileIdLowBits"), - functions.col("recordIndexMetadata.fileIndex").as("fileIndex"), - functions.col("recordIndexMetadata.fileId").as("fileId"), - functions.col("recordIndexMetadata.instantTime").as("instantTime"), - functions.col("recordIndexMetadata.fileIdEncoding").as("fileIdEncoding")) - .toJavaRDD() - .mapToPair(row -> { - HoodieRecordGlobalLocation location = HoodieTableMetadataUtil.getLocationFromRecordIndexInfo( - row.getString(row.fieldIndex("partitionName")), - row.getInt(row.fieldIndex("fileIdEncoding")), - row.getLong(row.fieldIndex("fileIdHighBits")), - row.getLong(row.fieldIndex("fileIdLowBits")), - row.getInt(row.fieldIndex("fileIndex")), - row.getString(row.fieldIndex("fileId")), - row.getLong(row.fieldIndex("instantTime"))); - return new Tuple2<>(row.getString(row.fieldIndex("key")), - Pair.of(location.getPartitionPath(), location.getFileId())); - }); + getRecordLocationsFromRLI(sparkEngineContext, basePath, latestCompletedCommit); int numErrorSamples = cfg.numRecordIndexErrorSamples; Pair<Long, List<String>> result = keyToLocationOnFsRdd.fullOuterJoin(keyToLocationFromRecordIndexRdd, cfg.recordIndexParallelism) @@ -1032,6 +1016,60 @@ public class HoodieMetadataTableValidator implements Serializable { } } + @VisibleForTesting + JavaPairRDD<String, Pair<String, String>> getRecordLocationsFromFSBasedListing(HoodieSparkEngineContext sparkEngineContext, + String basePath, + String latestCompletedCommit) { + return sparkEngineContext.getSqlContext().read().format("hudi") + .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT().key(), latestCompletedCommit) + .load(basePath) + .select(RECORD_KEY_METADATA_FIELD, PARTITION_PATH_METADATA_FIELD, FILENAME_METADATA_FIELD) + .toJavaRDD() + .mapToPair(row -> new Tuple2<>(row.getString(row.fieldIndex(RECORD_KEY_METADATA_FIELD)), + Pair.of(row.getString(row.fieldIndex(PARTITION_PATH_METADATA_FIELD)), + FSUtils.getFileId(row.getString(row.fieldIndex(FILENAME_METADATA_FIELD)))))) + .cache(); + } + + @VisibleForTesting + JavaPairRDD<String, Pair<String, String>> getRecordLocationsFromRLI(HoodieSparkEngineContext sparkEngineContext, + String basePath, + String latestCompletedCommit) { + return sparkEngineContext.getSqlContext().read().format("hudi") + .load(getMetadataTableBasePath(basePath)) + .filter("type = 5") + .select(functions.col("key"), + functions.col("recordIndexMetadata.partitionName").as("partitionName"), + functions.col("recordIndexMetadata.fileIdHighBits").as("fileIdHighBits"), + functions.col("recordIndexMetadata.fileIdLowBits").as("fileIdLowBits"), + functions.col("recordIndexMetadata.fileIndex").as("fileIndex"), + functions.col("recordIndexMetadata.fileId").as("fileId"), + functions.col("recordIndexMetadata.instantTime").as("instantTime"), + functions.col("recordIndexMetadata.fileIdEncoding").as("fileIdEncoding")) + .toJavaRDD() + .map(row -> { + HoodieRecordGlobalLocation location = HoodieTableMetadataUtil.getLocationFromRecordIndexInfo( + row.getString(row.fieldIndex("partitionName")), + row.getInt(row.fieldIndex("fileIdEncoding")), + row.getLong(row.fieldIndex("fileIdHighBits")), + row.getLong(row.fieldIndex("fileIdLowBits")), + row.getInt(row.fieldIndex("fileIndex")), + row.getString(row.fieldIndex("fileId")), + row.getLong(row.fieldIndex("instantTime"))); + // handle false positive case. a commit was pending when FS based locations were fetched, but committed when MDT was polled. + if (HoodieTimeline.compareTimestamps(location.getInstantTime(), GREATER_THAN, latestCompletedCommit)) { + return new Tuple2<>(row, Option.empty()); + } else { + return new Tuple2<>(row, Option.of(location)); + } + }).filter(tuple2 -> tuple2._2.isPresent()) // filter the false positives + .mapToPair(tuple2 -> { + Tuple2<Row, Option<HoodieRecordGlobalLocation>> rowAndLocation = (Tuple2<Row, Option<HoodieRecordGlobalLocation>>) tuple2; + return new Tuple2<>(rowAndLocation._1.getString(rowAndLocation._1.fieldIndex("key")), + Pair.of(rowAndLocation._2.get().getPartitionPath(), rowAndLocation._2.get().getFileId())); + }).cache(); + } + private String constructLocationInfoString(String recordKey, Optional<Pair<String, String>> locationOnFs, Optional<Pair<String, String>> locationFromRecordIndex) { StringBuilder sb = new StringBuilder(); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java index dd6ee4730ba..a9af0146db1 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java @@ -20,6 +20,7 @@ package org.apache.hudi.utilities; import org.apache.hudi.DataSourceWriteOptions; import org.apache.hudi.client.common.HoodieSparkEngineContext; +import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; @@ -27,10 +28,16 @@ import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieValidationException; +import org.apache.hudi.hadoop.fs.HadoopFSUtils; import org.apache.hudi.storage.HoodieStorage; import org.apache.hudi.testutils.HoodieSparkClientTestBase; +import jodd.io.FileUtil; +import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.Dataset; @@ -59,7 +66,6 @@ public class TestHoodieMetadataTableValidator extends HoodieSparkClientTestBase @Test public void testMetadataTableValidation() { - Map<String, String> writeOptions = new HashMap<>(); writeOptions.put(DataSourceWriteOptions.TABLE_NAME().key(), "test_table"); writeOptions.put("hoodie.table.name", "test_table"); @@ -71,11 +77,17 @@ public class TestHoodieMetadataTableValidator extends HoodieSparkClientTestBase Dataset<Row> inserts = makeInsertDf("000", 5).cache(); inserts.write().format("hudi").options(writeOptions) .option(DataSourceWriteOptions.OPERATION().key(), WriteOperationType.BULK_INSERT.value()) + .option(HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key(), "true") + .option(HoodieMetadataConfig.RECORD_INDEX_MIN_FILE_GROUP_COUNT_PROP.key(), "1") + .option(HoodieMetadataConfig.RECORD_INDEX_MAX_FILE_GROUP_COUNT_PROP.key(), "1") .mode(SaveMode.Overwrite) .save(basePath); Dataset<Row> updates = makeUpdateDf("001", 5).cache(); updates.write().format("hudi").options(writeOptions) .option(DataSourceWriteOptions.OPERATION().key(), WriteOperationType.UPSERT.value()) + .option(HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key(), "true") + .option(HoodieMetadataConfig.RECORD_INDEX_MIN_FILE_GROUP_COUNT_PROP.key(), "1") + .option(HoodieMetadataConfig.RECORD_INDEX_MAX_FILE_GROUP_COUNT_PROP.key(), "1") .mode(SaveMode.Append) .save(basePath); @@ -196,6 +208,110 @@ public class TestHoodieMetadataTableValidator extends HoodieSparkClientTestBase } } + @Test + public void testRliValidationFalsePositiveCase() throws IOException { + Map<String, String> writeOptions = new HashMap<>(); + writeOptions.put(DataSourceWriteOptions.TABLE_NAME().key(), "test_table"); + writeOptions.put("hoodie.table.name", "test_table"); + writeOptions.put(DataSourceWriteOptions.TABLE_TYPE().key(), "MERGE_ON_READ"); + writeOptions.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), "_row_key"); + writeOptions.put(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), "timestamp"); + writeOptions.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), "partition_path"); + + Dataset<Row> inserts = makeInsertDf("000", 5).cache(); + inserts.write().format("hudi").options(writeOptions) + .option(DataSourceWriteOptions.OPERATION().key(), WriteOperationType.BULK_INSERT.value()) + .option(HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key(), "true") + .option(HoodieMetadataConfig.RECORD_INDEX_MIN_FILE_GROUP_COUNT_PROP.key(), "1") + .option(HoodieMetadataConfig.RECORD_INDEX_MAX_FILE_GROUP_COUNT_PROP.key(), "1") + .mode(SaveMode.Overwrite) + .save(basePath); + Dataset<Row> updates = makeUpdateDf("001", 5).cache(); + updates.write().format("hudi").options(writeOptions) + .option(DataSourceWriteOptions.OPERATION().key(), WriteOperationType.UPSERT.value()) + .option(HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key(), "true") + .option(HoodieMetadataConfig.RECORD_INDEX_MIN_FILE_GROUP_COUNT_PROP.key(), "1") + .option(HoodieMetadataConfig.RECORD_INDEX_MAX_FILE_GROUP_COUNT_PROP.key(), "1") + .mode(SaveMode.Append) + .save(basePath); + + Dataset<Row> inserts2 = makeInsertDf("002", 5).cache(); + inserts2.write().format("hudi").options(writeOptions) + .option(DataSourceWriteOptions.OPERATION().key(), WriteOperationType.BULK_INSERT.value()) + .option(HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key(), "true") + .option(HoodieMetadataConfig.RECORD_INDEX_MIN_FILE_GROUP_COUNT_PROP.key(), "1") + .option(HoodieMetadataConfig.RECORD_INDEX_MAX_FILE_GROUP_COUNT_PROP.key(), "1") + .mode(SaveMode.Append) + .save(basePath); + + // validate MDT + HoodieMetadataTableValidator.Config config = new HoodieMetadataTableValidator.Config(); + config.basePath = "file://" + basePath; + config.validateLatestFileSlices = true; + config.validateAllFileGroups = true; + + // lets ensure we have a pending commit when FS based polling is done. and the commit completes when MDT is polled. + HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setBasePath(basePath).setConf(HadoopFSUtils.getStorageConfWithCopy(jsc.hadoopConfiguration())).build(); + // moving out the completed commit meta file to a temp location + HoodieInstant lastInstant = metaClient.getActiveTimeline().filterCompletedInstants().lastInstant().get(); + String latestCompletedCommitMetaFile = basePath + "/.hoodie/" + lastInstant.getFileName(); + String tempDir = getTempLocation(); + String destFilePath = tempDir + "/" + lastInstant.getFileName(); + FileUtil.move(latestCompletedCommitMetaFile, destFilePath); + + MockHoodieMetadataTableValidatorForRli validator = new MockHoodieMetadataTableValidatorForRli(jsc, config); + validator.setOriginalFilePath(latestCompletedCommitMetaFile); + validator.setDestFilePath(destFilePath); + assertTrue(validator.run()); + assertFalse(validator.hasValidationFailure()); + assertTrue(validator.getThrowables().isEmpty()); + } + + /** + * Class to assist with testing a false positive case with RLI validation. + */ + static class MockHoodieMetadataTableValidatorForRli extends HoodieMetadataTableValidator { + + private String destFilePath; + private String originalFilePath; + + public MockHoodieMetadataTableValidatorForRli(JavaSparkContext jsc, Config cfg) { + super(jsc, cfg); + } + + @Override + JavaPairRDD<String, Pair<String, String>> getRecordLocationsFromRLI(HoodieSparkEngineContext sparkEngineContext, + String basePath, + String latestCompletedCommit) { + // move the completed file back to ".hoodie" to simuate the false positive case. + try { + FileUtil.move(destFilePath, originalFilePath); + return super.getRecordLocationsFromRLI(sparkEngineContext, basePath, latestCompletedCommit); + } catch (IOException e) { + throw new HoodieException("Move should not have failed"); + } + } + + public void setDestFilePath(String destFilePath) { + this.destFilePath = destFilePath; + } + + public void setOriginalFilePath(String originalFilePath) { + this.originalFilePath = originalFilePath; + } + } + + private String getTempLocation() { + try { + String folderName = "temp_location"; + java.nio.file.Path tempPath = tempDir.resolve(folderName); + java.nio.file.Files.createDirectories(tempPath); + return tempPath.toAbsolutePath().toString(); + } catch (IOException ioe) { + throw new HoodieIOException(ioe.getMessage(), ioe); + } + } + protected Dataset<Row> makeInsertDf(String instantTime, Integer n) { List<String> records = dataGen.generateInserts(instantTime, n).stream() .map(r -> recordToString(r).get()).collect(Collectors.toList());
