This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new a23c5b783e7 [HUDI-7556] Fixing false positive validation with MDT
validator (#10986)
a23c5b783e7 is described below
commit a23c5b783e70e31bb269e2dd22604cd34928d162
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Tue Apr 9 21:58:22 2024 -0700
[HUDI-7556] Fixing false positive validation with MDT validator (#10986)
---
.../utilities/HoodieMetadataTableValidator.java | 96 +++++++++------
.../TestHoodieMetadataTableValidator.java | 131 ++++++++++++++++++++-
2 files changed, 187 insertions(+), 40 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 06dbde8b108..1df0d11f4f9 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -52,6 +52,7 @@ import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ParquetUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
@@ -514,7 +515,9 @@ public class HoodieMetadataTableValidator implements
Serializable {
}
HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
- List<String> allPartitions = validatePartitions(engineContext, basePath);
+ // compare partitions
+
+ List<String> allPartitions = validatePartitions(engineContext, basePath,
metaClient);
if (allPartitions.isEmpty()) {
LOG.warn("The result of getting all partitions is null or empty, skip
current validation. {}", taskLabels);
@@ -612,39 +615,14 @@ public class HoodieMetadataTableValidator implements
Serializable {
/**
* Compare the listing partitions result between metadata table and
fileSystem.
*/
- private List<String> validatePartitions(HoodieSparkEngineContext
engineContext, String basePath) {
+ @VisibleForTesting
+ List<String> validatePartitions(HoodieSparkEngineContext engineContext,
String basePath, HoodieTableMetaClient metaClient) {
// compare partitions
- List<String> allPartitionPathsFromFS =
FSUtils.getAllPartitionPaths(engineContext, basePath, false);
HoodieTimeline completedTimeline =
metaClient.getCommitsTimeline().filterCompletedInstants();
+ List<String> allPartitionPathsFromFS =
getPartitionsFromFileSystem(engineContext, basePath, metaClient.getFs(),
+ completedTimeline);
- // ignore partitions created by uncommitted ingestion.
- allPartitionPathsFromFS =
allPartitionPathsFromFS.stream().parallel().filter(part -> {
- HoodiePartitionMetadata hoodiePartitionMetadata =
- new HoodiePartitionMetadata(metaClient.getFs(),
FSUtils.getPartitionPath(basePath, part));
-
- Option<String> instantOption =
hoodiePartitionMetadata.readPartitionCreatedCommitTime();
- if (instantOption.isPresent()) {
- String instantTime = instantOption.get();
- // There are two cases where the created commit time is written to the
partition metadata:
- // (1) Commit C1 creates the partition and C1 succeeds, the partition
metadata has C1 as
- // the created commit time.
- // (2) Commit C1 creates the partition, the partition metadata is
written, and C1 fails
- // during writing data files. Next time, C2 adds new data to the same
partition after C1
- // is rolled back. In this case, the partition metadata still has C1
as the created commit
- // time, since Hudi does not rewrite the partition metadata in C2.
- if (!completedTimeline.containsOrBeforeTimelineStarts(instantTime)) {
- Option<HoodieInstant> lastInstant = completedTimeline.lastInstant();
- return lastInstant.isPresent()
- && HoodieTimeline.compareTimestamps(
- instantTime, LESSER_THAN_OR_EQUALS,
lastInstant.get().getTimestamp());
- }
- return true;
- } else {
- return false;
- }
- }).collect(Collectors.toList());
-
- List<String> allPartitionPathsMeta =
FSUtils.getAllPartitionPaths(engineContext, basePath, true);
+ List<String> allPartitionPathsMeta = getPartitionsFromMDT(engineContext,
basePath);
Collections.sort(allPartitionPathsFromFS);
Collections.sort(allPartitionPathsMeta);
@@ -652,26 +630,23 @@ public class HoodieMetadataTableValidator implements
Serializable {
if (allPartitionPathsFromFS.size() != allPartitionPathsMeta.size()
|| !allPartitionPathsFromFS.equals(allPartitionPathsMeta)) {
List<String> additionalFromFS = new ArrayList<>(allPartitionPathsFromFS);
- additionalFromFS.remove(allPartitionPathsMeta);
+ additionalFromFS.removeAll(allPartitionPathsMeta);
List<String> additionalFromMDT = new ArrayList<>(allPartitionPathsMeta);
- additionalFromMDT.remove(allPartitionPathsFromFS);
+ additionalFromMDT.removeAll(allPartitionPathsFromFS);
boolean misMatch = true;
List<String> actualAdditionalPartitionsInMDT = new
ArrayList<>(additionalFromMDT);
if (additionalFromFS.isEmpty() && !additionalFromMDT.isEmpty()) {
// there is a chance that when we polled MDT there could have been a
new completed commit which was not complete when we polled FS based
// listing. let's rule that out.
additionalFromMDT.forEach(partitionFromDMT -> {
-
- HoodiePartitionMetadata hoodiePartitionMetadata =
- new HoodiePartitionMetadata(metaClient.getFs(),
FSUtils.getPartitionPath(basePath, partitionFromDMT));
- Option<String> partitionCreationTimeOpt =
hoodiePartitionMetadata.readPartitionCreatedCommitTime();
+ Option<String> partitionCreationTimeOpt =
getPartitionCreationInstant(metaClient.getFs(), basePath, partitionFromDMT);
// if creation time is greater than last completed instant in active
timeline, we can ignore the additional partition from MDT.
if (partitionCreationTimeOpt.isPresent() &&
!completedTimeline.containsInstant(partitionCreationTimeOpt.get())) {
Option<HoodieInstant> lastInstant =
completedTimeline.lastInstant();
if (lastInstant.isPresent()
&&
HoodieTimeline.compareTimestamps(partitionCreationTimeOpt.get(), GREATER_THAN,
lastInstant.get().getTimestamp())) {
LOG.warn("Ignoring additional partition " + partitionFromDMT +
", as it was deduced to be part of a "
- + "latest completed commit which was inflighht when FS based
listing was polled.");
+ + "latest completed commit which was inflight when FS based
listing was polled.");
actualAdditionalPartitionsInMDT.remove(partitionFromDMT);
}
}
@@ -689,10 +664,53 @@ public class HoodieMetadataTableValidator implements
Serializable {
throw new HoodieValidationException(message);
}
}
-
return allPartitionPathsMeta;
}
+ @VisibleForTesting
+ Option<String> getPartitionCreationInstant(FileSystem fs, String basePath,
String partition) {
+ HoodiePartitionMetadata hoodiePartitionMetadata =
+ new HoodiePartitionMetadata(fs, FSUtils.getPartitionPath(basePath,
partition));
+ return hoodiePartitionMetadata.readPartitionCreatedCommitTime();
+ }
+
+ @VisibleForTesting
+ List<String> getPartitionsFromMDT(HoodieEngineContext engineContext, String
basePath) {
+ return FSUtils.getAllPartitionPaths(engineContext, basePath, true);
+ }
+
+ @VisibleForTesting
+ List<String> getPartitionsFromFileSystem(HoodieEngineContext engineContext,
String basePath,
+ FileSystem fs, HoodieTimeline
completedTimeline) {
+ List<String> allPartitionPathsFromFS =
FSUtils.getAllPartitionPaths(engineContext, basePath, false);
+
+ // ignore partitions created by uncommitted ingestion.
+ return allPartitionPathsFromFS.stream().parallel().filter(part -> {
+ HoodiePartitionMetadata hoodiePartitionMetadata =
+ new HoodiePartitionMetadata(fs, FSUtils.getPartitionPath(basePath,
part));
+ Option<String> instantOption =
hoodiePartitionMetadata.readPartitionCreatedCommitTime();
+ if (instantOption.isPresent()) {
+ String instantTime = instantOption.get();
+ // There are two cases where the created commit time is written to the
partition metadata:
+ // (1) Commit C1 creates the partition and C1 succeeds, the partition
metadata has C1 as
+ // the created commit time.
+ // (2) Commit C1 creates the partition, the partition metadata is
written, and C1 fails
+ // during writing data files. Next time, C2 adds new data to the same
partition after C1
+ // is rolled back. In this case, the partition metadata still has C1
as the created commit
+ // time, since Hudi does not rewrite the partition metadata in C2.
+ if (!completedTimeline.containsOrBeforeTimelineStarts(instantTime)) {
+ Option<HoodieInstant> lastInstant = completedTimeline.lastInstant();
+ return lastInstant.isPresent()
+ && HoodieTimeline.compareTimestamps(
+ instantTime, LESSER_THAN_OR_EQUALS,
lastInstant.get().getTimestamp());
+ }
+ return true;
+ } else {
+ return false;
+ }
+ }).collect(Collectors.toList());
+ }
+
/**
* Compare the file listing and index data between metadata table and
fileSystem.
* For now, validate five kinds of apis:
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
index e87f6257c54..131a96329f0 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
@@ -19,16 +19,32 @@
package org.apache.hudi.utilities;
import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.HoodieTimeGeneratorConfig;
+import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimeGenerator;
+import org.apache.hudi.common.table.timeline.TimeGenerators;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.testutils.HoodieSparkClientTestBase;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -36,14 +52,18 @@ import java.util.stream.Collectors;
import static
org.apache.hudi.common.testutils.RawTripTestPayload.recordToString;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
public class TestHoodieMetadataTableValidator extends
HoodieSparkClientTestBase {
@Test
public void testMetadataTableValidation() {
- Map<String,String> writeOptions = new HashMap<>();
+ Map<String, String> writeOptions = new HashMap<>();
writeOptions.put(DataSourceWriteOptions.TABLE_NAME().key(), "test_table");
writeOptions.put("hoodie.table.name", "test_table");
writeOptions.put(DataSourceWriteOptions.TABLE_TYPE().key(),
"MERGE_ON_READ");
@@ -73,6 +93,115 @@ public class TestHoodieMetadataTableValidator extends
HoodieSparkClientTestBase
assertTrue(validator.getThrowables().isEmpty());
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ public void testAdditionalPartitionsinMDT(boolean testFailureCase) throws
InterruptedException {
+ Map<String, String> writeOptions = new HashMap<>();
+ writeOptions.put(DataSourceWriteOptions.TABLE_NAME().key(), "test_table");
+ writeOptions.put("hoodie.table.name", "test_table");
+ writeOptions.put(DataSourceWriteOptions.TABLE_TYPE().key(),
"MERGE_ON_READ");
+ writeOptions.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(),
"_row_key");
+ writeOptions.put(DataSourceWriteOptions.PRECOMBINE_FIELD().key(),
"timestamp");
+ writeOptions.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(),
"partition_path");
+
+ // constructor of HoodieMetadataValidator instantiates
HoodieTableMetaClient. hence creating an actual table. but rest of tests is
mocked.
+ Dataset<Row> inserts = makeInsertDf("000", 5).cache();
+ inserts.write().format("hudi").options(writeOptions)
+ .option(DataSourceWriteOptions.OPERATION().key(),
WriteOperationType.BULK_INSERT.value())
+ .mode(SaveMode.Overwrite)
+ .save(basePath);
+
+ HoodieMetadataTableValidator.Config config = new
HoodieMetadataTableValidator.Config();
+ config.basePath = basePath;
+ config.validateLatestFileSlices = true;
+ config.validateAllFileGroups = true;
+ MockHoodieMetadataTableValidator validator = new
MockHoodieMetadataTableValidator(jsc, config);
+ HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+ HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class);
+
+ String partition1 = "PARTITION1";
+ String partition2 = "PARTITION2";
+ String partition3 = "PARTITION3";
+
+ // mock list of partitions to return from MDT to have 1 additional
partition compared to FS based listing.
+ List<String> mdtPartitions = Arrays.asList(partition1, partition2,
partition3);
+ validator.setMetadataPartitionsToReturn(mdtPartitions);
+ List<String> fsPartitions = Arrays.asList(partition1, partition2);
+ validator.setFsPartitionsToReturn(fsPartitions);
+
+ // mock completed timeline.
+ HoodieTimeline commitsTimeline = mock(HoodieTimeline.class);
+ HoodieTimeline completedTimeline = mock(HoodieTimeline.class);
+ when(metaClient.getCommitsTimeline()).thenReturn(commitsTimeline);
+
when(commitsTimeline.filterCompletedInstants()).thenReturn(completedTimeline);
+
+ TimeGenerator timeGenerator = TimeGenerators
+ .getTimeGenerator(HoodieTimeGeneratorConfig.defaultConfig(basePath),
jsc.hadoopConfiguration());
+
+ if (testFailureCase) {
+ // 3rd partition which is additional in MDT should have creation time
before last instant in timeline.
+
+ String partition3CreationTime =
HoodieActiveTimeline.createNewInstantTime(true, timeGenerator);
+ Thread.sleep(100);
+ String lastIntantCreationTime =
HoodieActiveTimeline.createNewInstantTime(true, timeGenerator);
+
+ HoodieInstant lastInstant = new
HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION,
lastIntantCreationTime);
+ when(completedTimeline.lastInstant()).thenReturn(Option.of(lastInstant));
+ validator.setPartitionCreationTime(Option.of(partition3CreationTime));
+ // validate that exception is thrown since MDT has one additional
partition.
+ assertThrows(HoodieValidationException.class, () -> {
+ validator.validatePartitions(engineContext, basePath, metaClient);
+ });
+ } else {
+ // 3rd partition creation time is > last completed instant
+ HoodieInstant lastInstant = new
HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION,
HoodieActiveTimeline.createNewInstantTime(true, timeGenerator));
+ when(completedTimeline.lastInstant()).thenReturn(Option.of(lastInstant));
+ Thread.sleep(100);
+
validator.setPartitionCreationTime(Option.of(HoodieActiveTimeline.createNewInstantTime(true,
timeGenerator)));
+
+ // validate that all 3 partitions are returned
+ assertEquals(mdtPartitions, validator.validatePartitions(engineContext,
basePath, metaClient));
+ }
+ }
+
+ class MockHoodieMetadataTableValidator extends HoodieMetadataTableValidator {
+
+ private List<String> metadataPartitionsToReturn;
+ private List<String> fsPartitionsToReturn;
+ private Option<String> partitionCreationTime;
+
+ public MockHoodieMetadataTableValidator(JavaSparkContext jsc, Config cfg) {
+ super(jsc, cfg);
+ }
+
+ void setMetadataPartitionsToReturn(List<String>
metadataPartitionsToReturn) {
+ this.metadataPartitionsToReturn = metadataPartitionsToReturn;
+ }
+
+ void setFsPartitionsToReturn(List<String> fsPartitionsToReturn) {
+ this.fsPartitionsToReturn = fsPartitionsToReturn;
+ }
+
+ void setPartitionCreationTime(Option<String> partitionCreationTime) {
+ this.partitionCreationTime = partitionCreationTime;
+ }
+
+ @Override
+ List<String> getPartitionsFromFileSystem(HoodieEngineContext
engineContext, String basePath, FileSystem fs, HoodieTimeline
completedTimeline) {
+ return fsPartitionsToReturn;
+ }
+
+ @Override
+ List<String> getPartitionsFromMDT(HoodieEngineContext engineContext,
String basePath) {
+ return metadataPartitionsToReturn;
+ }
+
+ @Override
+ Option<String> getPartitionCreationInstant(FileSystem fs, String basePath,
String partition) {
+ return this.partitionCreationTime;
+ }
+ }
+
protected Dataset<Row> makeInsertDf(String instantTime, Integer n) {
List<String> records = dataGen.generateInserts(instantTime, n).stream()
.map(r -> recordToString(r).get()).collect(Collectors.toList());