This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 773b385ba6a [HUDI-8337] Fixing failure handling with
HoodieMetadataTableValidator (#12247)
773b385ba6a is described below
commit 773b385ba6a01e4909252adaf8b994681e1021ae
Author: Lin Liu <[email protected]>
AuthorDate: Wed Nov 20 18:46:18 2024 -0800
[HUDI-8337] Fixing failure handling with HoodieMetadataTableValidator
(#12247)
---------
Co-authored-by: Sivabalan Narayanan <[email protected]>
---
.../utilities/HoodieMetadataTableValidator.java | 80 ++++--
.../TestHoodieMetadataTableValidator.java | 275 ++++++++++++++++-----
2 files changed, 276 insertions(+), 79 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 5bdfa31821c..63079544886 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -55,6 +55,7 @@ import org.apache.hudi.common.table.log.HoodieLogFormat;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.InstantComparison;
import org.apache.hudi.common.table.view.FileSystemViewManager;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
@@ -90,6 +91,7 @@ import com.beust.jcommander.Parameter;
import org.apache.avro.Schema;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
+import org.apache.spark.SparkException;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -118,6 +120,7 @@ import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import scala.Tuple2;
@@ -506,6 +509,11 @@ public class HoodieMetadataTableValidator implements
Serializable {
LOG.info(" ****** do hoodie metadata table validation once - {}
******", taskLabels);
result = doHoodieMetadataTableValidationOnce();
}
+ return result;
+ } catch (HoodieValidationException ve) {
+ if (!cfg.ignoreFailed) {
+ throw ve;
+ }
} catch (Exception e) {
throw new HoodieException("Unable to do hoodie metadata table validation
in " + cfg.basePath, e);
} finally {
@@ -513,8 +521,8 @@ public class HoodieMetadataTableValidator implements
Serializable {
if (asyncMetadataTableValidateService.isPresent()) {
asyncMetadataTableValidateService.get().shutdown(true);
}
- return result;
}
+ return result;
}
private boolean doHoodieMetadataTableValidationOnce() {
@@ -580,7 +588,6 @@ public class HoodieMetadataTableValidator implements
Serializable {
// compare partitions
List<String> allPartitions = validatePartitions(engineContext, basePath,
metaClient);
-
if (allPartitions.isEmpty()) {
LOG.warn("The result of getting all partitions is null or empty, skip
current validation. {}", taskLabels);
return true;
@@ -650,8 +657,14 @@ public class HoodieMetadataTableValidator implements
Serializable {
LOG.warn("Metadata table validation failed ({}).", taskLabels);
return false;
}
- } catch (HoodieValidationException e) {
- throw e;
+ } catch (HoodieValidationException validationException) {
+ throw validationException;
+ } catch (SparkException sparkException) {
+ if (sparkException.getCause() instanceof HoodieValidationException) {
+ throw (HoodieValidationException) sparkException.getCause();
+ } else {
+ throw new HoodieValidationException("Unexpected spark failure",
sparkException);
+ }
} catch (Exception e) {
LOG.warn("Error closing HoodieMetadataValidationContext, "
+ "ignoring the error as the validation is successful.", e);
@@ -719,30 +732,36 @@ public class HoodieMetadataTableValidator implements
Serializable {
additionalFromFS.removeAll(allPartitionPathsMeta);
List<String> additionalFromMDT = new ArrayList<>(allPartitionPathsMeta);
additionalFromMDT.removeAll(allPartitionPathsFromFS);
- boolean misMatch = true;
+ AtomicBoolean misMatch = new AtomicBoolean(true);
List<String> actualAdditionalPartitionsInMDT = new
ArrayList<>(additionalFromMDT);
if (additionalFromFS.isEmpty() && !additionalFromMDT.isEmpty()) {
// there is a chance that when we polled MDT there could have been a
new completed commit which was not complete when we polled FS based
// listing. let's rule that out.
- additionalFromMDT.forEach(partitionFromDMT -> {
- Option<String> partitionCreationTimeOpt =
getPartitionCreationInstant(metaClient.getStorage(), basePath,
partitionFromDMT);
- // if creation time is greater than last completed instant in active
timeline, we can ignore the additional partition from MDT.
- if (partitionCreationTimeOpt.isPresent() &&
!completedTimeline.containsInstant(partitionCreationTimeOpt.get())) {
- Option<HoodieInstant> lastInstant =
completedTimeline.lastInstant();
- if (lastInstant.isPresent()
- && compareTimestamps(partitionCreationTimeOpt.get(),
GREATER_THAN, lastInstant.get().requestedTime())) {
- LOG.warn("Ignoring additional partition {}, as it was deduced to
be part of a "
- + "latest completed commit which was inflight when FS based
listing was polled.", partitionFromDMT);
- actualAdditionalPartitionsInMDT.remove(partitionFromDMT);
+ additionalFromMDT.forEach(partitionFromMDT -> {
+ try {
+ if (metaClient.getStorage().exists(new StoragePath(basePath + "/"
+ partitionFromMDT))) {
+ Option<String> partitionCreationTimeOpt =
getPartitionCreationInstant(metaClient.getStorage(), basePath,
partitionFromMDT);
+ // if creation time is greater than last completed instant in
active timeline, we can ignore the additional partition from MDT.
+ if (partitionCreationTimeOpt.isPresent() &&
!completedTimeline.containsInstant(partitionCreationTimeOpt.get())) {
+ Option<HoodieInstant> lastInstant =
completedTimeline.lastInstant();
+ if (lastInstant.isPresent()
+ &&
InstantComparison.compareTimestamps(partitionCreationTimeOpt.get(),
GREATER_THAN, lastInstant.get().requestedTime())) {
+ LOG.warn("Ignoring additional partition " + partitionFromMDT
+ ", as it was deduced to be part of a "
+ + "latest completed commit which was inflight when FS
based listing was polled.");
+ actualAdditionalPartitionsInMDT.remove(partitionFromMDT);
+ }
+ }
}
+ } catch (IOException e) {
+ throw new HoodieValidationException("IOException thrown while
trying to validate partition match b/w FS based listing and MDT based listing",
e);
}
});
// if there is no additional partitions from FS listing and only
additional partitions from MDT based listing is due to a new commit, we are good
if (actualAdditionalPartitionsInMDT.isEmpty()) {
- misMatch = false;
+ misMatch.set(false);
}
}
- if (misMatch) {
+ if (misMatch.get()) {
String message = "Compare Partitions Failed! " + " Additional
partitions from FS, but missing from MDT : \"" + additionalFromFS
+ "\" and additional partitions from MDT, but missing from FS
listing : \"" + actualAdditionalPartitionsInMDT
+ "\".\n All partitions from FS listing " +
allPartitionPathsFromFS;
@@ -979,7 +998,8 @@ public class HoodieMetadataTableValidator implements
Serializable {
AvroConversionUtils.convertAvroSchemaToStructType(metadataTableBasedContext.getSchema()),
metadataTableBasedContext.getMetadataConfig(),
metaClient, false);
HoodieData<HoodieMetadataColumnStats> partitionStats =
-
partitionStatsIndexSupport.loadColumnStatsIndexRecords(JavaConverters.asScalaBufferConverter(metadataTableBasedContext.allColumnNameList).asScala().toSeq(),
scala.Option.empty(), false)
+
partitionStatsIndexSupport.loadColumnStatsIndexRecords(JavaConverters.asScalaBufferConverter(
+ metadataTableBasedContext.allColumnNameList).asScala().toSeq(),
scala.Option.empty(), false)
// set isTightBound to false since partition stats generated using
column stats does not contain the field
.map(colStat ->
HoodieMetadataColumnStats.newBuilder(colStat).setIsTightBound(false).build());
JavaRDD<HoodieMetadataColumnStats> diffRDD =
HoodieJavaRDD.getJavaRDD(partitionStats).subtract(HoodieJavaRDD.getJavaRDD(partitionStatsUsingColStats));
@@ -1068,7 +1088,8 @@ public class HoodieMetadataTableValidator implements
Serializable {
validate(metadataBasedBloomFilters, fsBasedBloomFilters, partitionPath,
"bloom filters");
}
- private void validateRecordIndex(HoodieSparkEngineContext sparkEngineContext,
+ @VisibleForTesting
+ void validateRecordIndex(HoodieSparkEngineContext sparkEngineContext,
HoodieTableMetaClient metaClient) {
if
(!metaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.RECORD_INDEX))
{
return;
@@ -1385,8 +1406,11 @@ public class HoodieMetadataTableValidator implements
Serializable {
FileSlice fileSlice1 = fileSliceListFromMetadataTable.get(i);
FileSlice fileSlice2 = fileSliceListFromFS.get(i);
if (!Objects.equals(fileSlice1.getFileGroupId(),
fileSlice2.getFileGroupId())
- || !Objects.equals(fileSlice1.getBaseInstantTime(),
fileSlice2.getBaseInstantTime())
- || !Objects.equals(fileSlice1.getBaseFile(),
fileSlice2.getBaseFile())) {
+ || !Objects.equals(fileSlice1.getBaseInstantTime(),
fileSlice2.getBaseInstantTime())) {
+ mismatch = true;
+ break;
+ }
+ if (!assertBaseFilesEquality(fileSlice1, fileSlice2)) {
mismatch = true;
break;
}
@@ -1412,6 +1436,20 @@ public class HoodieMetadataTableValidator implements
Serializable {
}
}
+ private boolean assertBaseFilesEquality(FileSlice fileSlice1, FileSlice
fileSlice2) {
+ if (fileSlice1.getBaseFile().isPresent() &&
fileSlice2.getBaseFile().isPresent()) {
+ HoodieBaseFile baseFile1 = fileSlice1.getBaseFile().get();
+ HoodieBaseFile baseFile2 = fileSlice2.getBaseFile().get();
+ return baseFile1.getFileName().equals(baseFile2.getFileName()) &&
baseFile1.getFileId().equals(baseFile2.getFileId())
+ && baseFile1.getFileSize() == baseFile2.getFileSize();
+ } else {
+ if (!fileSlice1.getBaseFile().isPresent() ==
fileSlice2.getBaseFile().isPresent()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
/**
* Compares committed log files from two file slices.
*
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
index cfd40f90a50..166e4e8c5c0 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
@@ -24,20 +24,23 @@ import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.config.HoodieTimeGeneratorConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
-import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.HoodieFileGroup;
+import org.apache.hudi.common.model.HoodieLogFile;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimeGenerator;
import org.apache.hudi.common.table.timeline.TimeGenerators;
import org.apache.hudi.common.table.timeline.TimelineUtils;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.SerializationUtils;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.exception.HoodieException;
@@ -47,6 +50,7 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils;
import org.apache.hudi.storage.HoodieStorage;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.storage.StoragePathInfo;
+import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
import org.apache.hudi.testutils.HoodieSparkClientTestBase;
import jodd.io.FileUtil;
@@ -82,10 +86,8 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static org.apache.hadoop.fs.FileUtil.copy;
-import static
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeCommitMetadata;
-import static
org.apache.hudi.common.testutils.HoodieTestUtils.COMMIT_METADATA_SER_DE;
-import static
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_FILE_NAME_GENERATOR;
+import static
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
import static
org.apache.hudi.common.testutils.RawTripTestPayload.recordToString;
import static org.apache.spark.sql.types.DataTypes.IntegerType;
import static org.apache.spark.sql.types.DataTypes.StringType;
@@ -99,13 +101,7 @@ import static org.mockito.Mockito.when;
public class TestHoodieMetadataTableValidator extends
HoodieSparkClientTestBase {
private static Stream<Arguments> lastNFileSlicesTestArgs() {
- return Stream.of(
- Arguments.of(-1),
- Arguments.of(1),
- Arguments.of(3),
- Arguments.of(4),
- Arguments.of(5)
- );
+ return Stream.of(-1, 1, 3, 4, 5).flatMap(i -> Stream.of(Arguments.of(i,
true), Arguments.of(i, false)));
}
private static Stream<Arguments> viewStorageArgs() {
@@ -200,7 +196,7 @@ public class TestHoodieMetadataTableValidator extends
HoodieSparkClientTestBase
// validate MDT
HoodieMetadataTableValidator.Config config = new
HoodieMetadataTableValidator.Config();
- config.basePath = basePath;
+ config.basePath = "file:" + basePath;
config.validateLatestFileSlices = true;
config.validateAllFileGroups = true;
if (viewStorageTypeForFSListing != null && viewStorageTypeForMDTListing !=
null) {
@@ -474,7 +470,7 @@ public class TestHoodieMetadataTableValidator extends
HoodieSparkClientTestBase
@ParameterizedTest
@ValueSource(booleans = {true, false})
- public void testAdditionalPartitionsinMDT(boolean testFailureCase) throws
InterruptedException {
+ public void testAdditionalPartitionsinMDT(boolean testFailureCase) throws
IOException, InterruptedException {
Map<String, String> writeOptions = new HashMap<>();
writeOptions.put(DataSourceWriteOptions.TABLE_NAME().key(), "test_table");
writeOptions.put(DataSourceWriteOptions.TABLE_TYPE().key(),
"MERGE_ON_READ");
@@ -489,6 +485,10 @@ public class TestHoodieMetadataTableValidator extends
HoodieSparkClientTestBase
.mode(SaveMode.Overwrite)
.save(basePath);
+ String partition1 = "PARTITION1";
+ String partition2 = "PARTITION2";
+ String partition3 = "PARTITION3";
+
HoodieMetadataTableValidator.Config config = new
HoodieMetadataTableValidator.Config();
config.basePath = basePath;
config.validateLatestFileSlices = true;
@@ -496,10 +496,11 @@ public class TestHoodieMetadataTableValidator extends
HoodieSparkClientTestBase
MockHoodieMetadataTableValidator validator = new
MockHoodieMetadataTableValidator(jsc, config);
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class);
-
- String partition1 = "PARTITION1";
- String partition2 = "PARTITION2";
- String partition3 = "PARTITION3";
+ HoodieStorage storage = mock(HoodieHadoopStorage.class);
+ when(metaClient.getStorage()).thenReturn(storage);
+ when(storage.exists(new StoragePath(basePath + "/" +
partition1))).thenReturn(true);
+ when(storage.exists(new StoragePath(basePath + "/" +
partition2))).thenReturn(true);
+ when(storage.exists(new StoragePath(basePath + "/" +
partition3))).thenReturn(true);
// mock list of partitions to return from MDT to have 1 additional
partition compared to FS based listing.
List<String> mdtPartitions = Arrays.asList(partition1, partition2,
partition3);
@@ -548,44 +549,20 @@ public class TestHoodieMetadataTableValidator extends
HoodieSparkClientTestBase
@ParameterizedTest
@MethodSource("lastNFileSlicesTestArgs")
- public void testAdditionalFilesinMetadata(Integer lastNFileSlices) throws
Exception {
+ public void testAdditionalFilesInMetadata(Integer lastNFileSlices, boolean
ignoreFailed) throws IOException {
Map<String, String> writeOptions = new HashMap<>();
writeOptions.put(DataSourceWriteOptions.TABLE_NAME().key(), "test_table");
writeOptions.put(DataSourceWriteOptions.TABLE_TYPE().key(),
"MERGE_ON_READ");
writeOptions.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(),
"_row_key");
writeOptions.put(DataSourceWriteOptions.PRECOMBINE_FIELD().key(),
"timestamp");
-
writeOptions.put(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(),"2");
+
writeOptions.put(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(),
"2");
Dataset<Row> inserts = makeInsertDf("000", 10).cache();
inserts.write().format("hudi").options(writeOptions)
.mode(SaveMode.Overwrite)
.save(basePath);
- // Perform updates to generate log files
- inserts.write().format("hudi").options(writeOptions)
- .mode(SaveMode.Append)
- .save(basePath);
- HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
- HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(engineContext.getStorageConf()).build();
-
- // let's add a log file entry to the commit history and filesystem by
directly modifying the commit so FS based listing and MDT based listing
diverges.
- HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
- HoodieInstant instantToOverwrite = timeline.getInstants().get(1);
- HoodieCommitMetadata commitMetadata =
COMMIT_METADATA_SER_DE.deserialize(instantToOverwrite,
timeline.getInstantDetails(instantToOverwrite).get(),
HoodieCommitMetadata.class);
- HoodieWriteStat writeStatToCopy =
commitMetadata.getPartitionToWriteStats().entrySet().stream().flatMap(entry ->
entry.getValue().stream())
- .filter(writeStat ->
FSUtils.isLogFile(writeStat.getPath())).findFirst().get();
- String newLogFilePath = writeStatToCopy.getPath() + "1";
- HoodieWriteStat writeStatCopy =
SerializationUtils.deserialize(SerializationUtils.serialize(writeStatToCopy));
- writeStatCopy.setPath(newLogFilePath);
- commitMetadata.addWriteStat(writeStatCopy.getPartitionPath(),
writeStatCopy);
- FileSystem fs = HadoopFSUtils.getFs(newLogFilePath, new
Configuration(false));
- fs.copyFromLocalFile(new Path(basePath, writeStatToCopy.getPath()), new
Path(basePath, newLogFilePath));
- // remove the existing instant and rewrite with the new metadata
- assertTrue(fs.delete(new Path(basePath, String.format(".hoodie/%s",
INSTANT_FILE_NAME_GENERATOR.getFileName(instantToOverwrite)))));
-
timeline.saveAsComplete(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT,
instantToOverwrite.getAction(), instantToOverwrite.requestedTime(),
- instantToOverwrite.getCompletionTime()),
serializeCommitMetadata(COMMIT_METADATA_SER_DE, commitMetadata));
-
- for (int i = 0; i < 5; i++) {
+ for (int i = 0; i < 6; i++) {
inserts.write().format("hudi").options(writeOptions)
.mode(SaveMode.Append)
.save(basePath);
@@ -595,15 +572,79 @@ public class TestHoodieMetadataTableValidator extends
HoodieSparkClientTestBase
config.basePath = "file:" + basePath;
config.validateLatestFileSlices = true;
config.validateAllFileGroups = true;
- config.ignoreFailed = true;
+ config.ignoreFailed = ignoreFailed;
+ HoodieMetadataTableValidator validator = new
HoodieMetadataTableValidator(jsc, config);
+ HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
- HoodieMetadataTableValidator.Config finalConfig = config;
- HoodieMetadataTableValidator localValidator = new
HoodieMetadataTableValidator(jsc, finalConfig);
+ validator.run();
+ // assertFalse(validator.hasValidationFailure());
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(engineContext.getStorageConf()).build();
+
+ java.nio.file.Path tempFolderNioPath = tempDir.resolve("temp_folder");
+ java.nio.file.Files.createDirectories(tempFolderNioPath);
+ String tempFolder = tempFolderNioPath.toAbsolutePath().toString();
+ Path tempFolderPath = new Path(tempFolder);
+
+ // lets move one of the log files from latest file slice to the temp dir.
so validation w/ latest file slice should fail.
+ HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(
+ metaClient,
metaClient.getActiveTimeline().filterCompletedAndCompactionInstants(), false);
+ FileSlice latestFileSlice =
fsView.getLatestFileSlices(StringUtils.EMPTY_STRING).filter(fileSlice -> {
+ return fileSlice.getLogFiles().count() > 0;
+ }).collect(Collectors.toList()).get(0);
+ HoodieLogFile latestLogFile =
latestFileSlice.getLogFiles().collect(Collectors.toList()).get(0);
+ FileSystem fs = HadoopFSUtils.getFs(new
Path(latestLogFile.getPath().toString()), new Configuration(false));
+ fs.moveFromLocalFile(new Path(latestLogFile.getPath().toString()),
tempFolderPath);
+
+ config = new HoodieMetadataTableValidator.Config();
+ config.basePath = "file:" + basePath;
+ config.validateLatestFileSlices = true;
+ config.ignoreFailed = ignoreFailed;
+
+ HoodieMetadataTableValidator localValidator = new
HoodieMetadataTableValidator(jsc, config);
+ if (ignoreFailed) {
+ localValidator.run();
+ assertTrue(localValidator.hasValidationFailure());
+ assertTrue(localValidator.getThrowables().get(0) instanceof
HoodieValidationException);
+ } else {
+ assertThrows(HoodieValidationException.class, localValidator::run);
+ }
+
+ // lets move back the log file and validate validation suceeds.
+ fs.moveFromLocalFile(new Path(tempFolderPath + "/" +
latestLogFile.getFileName()), new Path(basePath));
+ config = new HoodieMetadataTableValidator.Config();
+ config.basePath = "file:" + basePath;
+ config.validateLatestFileSlices = true;
+ config.ignoreFailed = ignoreFailed;
+
+ localValidator = new HoodieMetadataTableValidator(jsc, config);
localValidator.run();
- assertTrue(localValidator.hasValidationFailure());
- assertTrue(localValidator.getThrowables().get(0) instanceof
HoodieValidationException);
+ // no exception should be thrown
+
+ // let's delete one of the log files from 1st commit and so FS based
listing and MDT based listing diverges when all file slices are validated.
+ fsView = new HoodieTableFileSystemView(
+ metaClient,
metaClient.getActiveTimeline().filterCompletedAndCompactionInstants(), false);
+ HoodieFileGroup fileGroup =
fsView.getAllFileGroups(StringUtils.EMPTY_STRING).collect(Collectors.toList()).get(0);
+ List<FileSlice> allFileSlices =
fileGroup.getAllFileSlices().collect(Collectors.toList());
+ FileSlice earliestFileSlice = allFileSlices.get(allFileSlices.size() - 1);
+ HoodieLogFile earliestLogFile =
earliestFileSlice.getLogFiles().collect(Collectors.toList()).get(0);
+ fs.delete(new Path(earliestLogFile.getPath().toString()));
- // lets set lastN file slices to 2 and so validation should succeed.
(bcoz, there will be mis-match only on first file slice)
+ config = new HoodieMetadataTableValidator.Config();
+ config.basePath = "file:" + basePath;
+ config.validateLatestFileSlices = true;
+ config.validateAllFileGroups = true;
+ config.ignoreFailed = ignoreFailed;
+ HoodieMetadataTableValidator.Config finalConfig = config;
+ localValidator = new HoodieMetadataTableValidator(jsc, finalConfig);
+ if (ignoreFailed) {
+ localValidator.run();
+ assertTrue(localValidator.hasValidationFailure());
+ assertTrue(localValidator.getThrowables().get(0) instanceof
HoodieValidationException);
+ } else {
+ assertThrows(HoodieValidationException.class, localValidator::run);
+ }
+
+ // lets set lastN file slices to argument value and so validation should
succeed. (bcoz, there will be mis-match only on first file slice)
config = new HoodieMetadataTableValidator.Config();
config.basePath = "file:" + basePath;
config.validateLatestFileSlices = true;
@@ -611,14 +652,132 @@ public class TestHoodieMetadataTableValidator extends
HoodieSparkClientTestBase
if (lastNFileSlices != -1) {
config.validateLastNFileSlices = lastNFileSlices;
}
- config.ignoreFailed = true;
- HoodieMetadataTableValidator validator = new
HoodieMetadataTableValidator(jsc, config);
- validator.run();
+ config.ignoreFailed = ignoreFailed;
+ validator = new HoodieMetadataTableValidator(jsc, config);
if (lastNFileSlices != -1 && lastNFileSlices < 4) {
+ validator.run();
assertFalse(validator.hasValidationFailure());
} else {
- assertTrue(validator.hasValidationFailure());
- assertTrue(validator.getThrowables().get(0) instanceof
HoodieValidationException);
+ if (ignoreFailed) {
+ validator.run();
+ assertTrue(validator.hasValidationFailure());
+ assertTrue(validator.getThrowables().get(0) instanceof
HoodieValidationException);
+ } else {
+ assertThrows(HoodieValidationException.class, validator::run);
+ }
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testAdditionalPartitionsinMdtEndToEnd(boolean ignoreFailed)
throws IOException {
+ Map<String, String> writeOptions = new HashMap<>();
+ writeOptions.put(DataSourceWriteOptions.TABLE_NAME().key(), "test_table");
+ writeOptions.put("hoodie.table.name", "test_table");
+ writeOptions.put(DataSourceWriteOptions.TABLE_TYPE().key(),
"MERGE_ON_READ");
+ writeOptions.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(),
"_row_key");
+ writeOptions.put(DataSourceWriteOptions.PRECOMBINE_FIELD().key(),
"timestamp");
+
writeOptions.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(),"partition_path");
+
writeOptions.put(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(),
"2");
+
+ Dataset<Row> inserts = makeInsertDf("000", 100).cache();
+ inserts.write().format("hudi").options(writeOptions)
+ .mode(SaveMode.Overwrite)
+ .save(basePath);
+
+ for (int i = 0; i < 6; i++) {
+ inserts.write().format("hudi").options(writeOptions)
+ .mode(SaveMode.Append)
+ .save(basePath);
+ }
+
+ HoodieMetadataTableValidator.Config config = new
HoodieMetadataTableValidator.Config();
+ config.basePath = "file:" + basePath;
+ config.validateLatestFileSlices = true;
+ config.validateAllFileGroups = true;
+ config.ignoreFailed = ignoreFailed;
+ HoodieMetadataTableValidator validator = new
HoodieMetadataTableValidator(jsc, config);
+ HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+
+ validator.run();
+ assertFalse(validator.hasValidationFailure());
+
+ // let's delete one of the partitions, so validation fails
+ FileSystem fs = HadoopFSUtils.getFs(basePath, new Configuration(false));
+ fs.delete(new Path(basePath + "/" +
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH));
+
+ config = new HoodieMetadataTableValidator.Config();
+ config.basePath = "file:" + basePath;
+ config.validateLatestFileSlices = true;
+ config.ignoreFailed = ignoreFailed;
+
+ HoodieMetadataTableValidator localValidator = new
HoodieMetadataTableValidator(jsc, config);
+ if (ignoreFailed) {
+ localValidator.run();
+ assertTrue(localValidator.hasValidationFailure());
+ assertTrue(localValidator.getThrowables().get(0) instanceof
HoodieValidationException);
+ } else {
+ assertThrows(HoodieValidationException.class, localValidator::run);
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testRecordIndexMismatch(boolean ignoreFailed) throws IOException
{
+ Map<String, String> writeOptions = new HashMap<>();
+ writeOptions.put(DataSourceWriteOptions.TABLE_NAME().key(), "test_table");
+ writeOptions.put("hoodie.table.name", "test_table");
+ writeOptions.put(DataSourceWriteOptions.TABLE_TYPE().key(),
"COPY_ON_WRITE");
+ writeOptions.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(),
"_row_key");
+ writeOptions.put(DataSourceWriteOptions.PRECOMBINE_FIELD().key(),
"timestamp");
+ writeOptions.put(DataSourceWriteOptions.OPERATION().key(),"bulk_insert");
+ writeOptions.put(HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key(),
"true");
+
+ Dataset<Row> inserts = makeInsertDf("000", 50).cache();
+ inserts.write().format("hudi").options(writeOptions)
+ .mode(SaveMode.Overwrite)
+ .save(basePath);
+
+ for (int i = 0; i < 6; i++) {
+ makeInsertDf("000", (i + 1) *
100).write().format("hudi").options(writeOptions)
+ .mode(SaveMode.Append)
+ .save(basePath);
+ }
+
+ HoodieMetadataTableValidator.Config config = new
HoodieMetadataTableValidator.Config();
+ config.basePath = "file:" + basePath;
+ config.validateLatestFileSlices = true;
+ config.ignoreFailed = ignoreFailed;
+ HoodieMetadataTableValidator validator = new
HoodieMetadataTableValidator(jsc, config);
+ HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(engineContext.getStorageConf()).build();
+
+ validator.run();
+ assertFalse(validator.hasValidationFailure());
+
+ // lets override one of the latest base file w/ another. so that file
slice validation succeeds, but record index comparison fails.
+ HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(
+ metaClient,
metaClient.getActiveTimeline().filterCompletedAndCompactionInstants(), false);
+ List<HoodieBaseFile> allBaseFiles =
fsView.getLatestBaseFiles(StringUtils.EMPTY_STRING).collect(Collectors.toList());
+
+ FileSystem fs = HadoopFSUtils.getFs(basePath, new Configuration(false));
+ fs.copyFromLocalFile(
+ new Path(allBaseFiles.get(0).getStoragePath().toString()),
+ new Path(allBaseFiles.get(1).getStoragePath().toString()));
+
+ config = new HoodieMetadataTableValidator.Config();
+ config.basePath = "file:" + basePath;
+ config.validateLatestFileSlices = true;
+ config.validateRecordIndexContent = true;
+ config.ignoreFailed = ignoreFailed;
+
+ HoodieMetadataTableValidator localValidator = new
HoodieMetadataTableValidator(jsc, config);
+ if (ignoreFailed) {
+ localValidator.run();
+ assertTrue(localValidator.hasValidationFailure());
+ assertTrue(localValidator.getThrowables().get(0) instanceof
HoodieValidationException);
+ } else {
+ assertThrows(HoodieValidationException.class, localValidator::run);
}
}