This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 07f271551153ebd6b19058d652ad26bf0b111215
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Tue Apr 9 21:58:22 2024 -0700

    [HUDI-7556] Fixing false positive validation with MDT validator (#10986)
---
 .../utilities/HoodieMetadataTableValidator.java    |  96 +++++++++------
 .../TestHoodieMetadataTableValidator.java          | 131 ++++++++++++++++++++-
 2 files changed, 187 insertions(+), 40 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index bbe8610abe3..924132df2da 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -52,6 +52,7 @@ import org.apache.hudi.common.util.ConfigUtils;
 import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ParquetUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieIOException;
@@ -514,7 +515,9 @@ public class HoodieMetadataTableValidator implements 
Serializable {
     }
 
     HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
-    List<String> allPartitions = validatePartitions(engineContext, basePath);
+    // compare partitions
+
+    List<String> allPartitions = validatePartitions(engineContext, basePath, 
metaClient);
 
     if (allPartitions.isEmpty()) {
       LOG.warn("The result of getting all partitions is null or empty, skip 
current validation. {}", taskLabels);
@@ -612,39 +615,14 @@ public class HoodieMetadataTableValidator implements 
Serializable {
   /**
    * Compare the listing partitions result between metadata table and 
fileSystem.
    */
-  private List<String> validatePartitions(HoodieSparkEngineContext 
engineContext, String basePath) {
+  @VisibleForTesting
+  List<String> validatePartitions(HoodieSparkEngineContext engineContext, 
String basePath, HoodieTableMetaClient metaClient) {
     // compare partitions
-    List<String> allPartitionPathsFromFS = 
FSUtils.getAllPartitionPaths(engineContext, basePath, false, 
cfg.assumeDatePartitioning);
     HoodieTimeline completedTimeline = 
metaClient.getCommitsTimeline().filterCompletedInstants();
+    List<String> allPartitionPathsFromFS = 
getPartitionsFromFileSystem(engineContext, basePath, metaClient.getFs(),
+        completedTimeline);
 
-    // ignore partitions created by uncommitted ingestion.
-    allPartitionPathsFromFS = 
allPartitionPathsFromFS.stream().parallel().filter(part -> {
-      HoodiePartitionMetadata hoodiePartitionMetadata =
-          new HoodiePartitionMetadata(metaClient.getFs(), 
FSUtils.getPartitionPath(basePath, part));
-
-      Option<String> instantOption = 
hoodiePartitionMetadata.readPartitionCreatedCommitTime();
-      if (instantOption.isPresent()) {
-        String instantTime = instantOption.get();
-        // There are two cases where the created commit time is written to the 
partition metadata:
-        // (1) Commit C1 creates the partition and C1 succeeds, the partition 
metadata has C1 as
-        // the created commit time.
-        // (2) Commit C1 creates the partition, the partition metadata is 
written, and C1 fails
-        // during writing data files.  Next time, C2 adds new data to the same 
partition after C1
-        // is rolled back. In this case, the partition metadata still has C1 
as the created commit
-        // time, since Hudi does not rewrite the partition metadata in C2.
-        if (!completedTimeline.containsOrBeforeTimelineStarts(instantTime)) {
-          Option<HoodieInstant> lastInstant = completedTimeline.lastInstant();
-          return lastInstant.isPresent()
-              && HoodieTimeline.compareTimestamps(
-              instantTime, LESSER_THAN_OR_EQUALS, 
lastInstant.get().getTimestamp());
-        }
-        return true;
-      } else {
-        return false;
-      }
-    }).collect(Collectors.toList());
-
-    List<String> allPartitionPathsMeta = 
FSUtils.getAllPartitionPaths(engineContext, basePath, true, 
cfg.assumeDatePartitioning);
+    List<String> allPartitionPathsMeta = getPartitionsFromMDT(engineContext, 
basePath);
 
     Collections.sort(allPartitionPathsFromFS);
     Collections.sort(allPartitionPathsMeta);
@@ -652,26 +630,23 @@ public class HoodieMetadataTableValidator implements 
Serializable {
     if (allPartitionPathsFromFS.size() != allPartitionPathsMeta.size()
         || !allPartitionPathsFromFS.equals(allPartitionPathsMeta)) {
       List<String> additionalFromFS = new ArrayList<>(allPartitionPathsFromFS);
-      additionalFromFS.remove(allPartitionPathsMeta);
+      additionalFromFS.removeAll(allPartitionPathsMeta);
       List<String> additionalFromMDT = new ArrayList<>(allPartitionPathsMeta);
-      additionalFromMDT.remove(allPartitionPathsFromFS);
+      additionalFromMDT.removeAll(allPartitionPathsFromFS);
       boolean misMatch = true;
       List<String> actualAdditionalPartitionsInMDT = new 
ArrayList<>(additionalFromMDT);
       if (additionalFromFS.isEmpty() && !additionalFromMDT.isEmpty()) {
         // there is a chance that when we polled MDT there could have been a 
new completed commit which was not complete when we polled FS based
         // listing. let's rule that out.
         additionalFromMDT.forEach(partitionFromDMT -> {
-
-          HoodiePartitionMetadata hoodiePartitionMetadata =
-              new HoodiePartitionMetadata(metaClient.getFs(), 
FSUtils.getPartitionPath(basePath, partitionFromDMT));
-          Option<String> partitionCreationTimeOpt = 
hoodiePartitionMetadata.readPartitionCreatedCommitTime();
+          Option<String> partitionCreationTimeOpt = 
getPartitionCreationInstant(metaClient.getFs(), basePath, partitionFromDMT);
           // if creation time is greater than last completed instant in active 
timeline, we can ignore the additional partition from MDT.
           if (partitionCreationTimeOpt.isPresent() && 
!completedTimeline.containsInstant(partitionCreationTimeOpt.get())) {
             Option<HoodieInstant> lastInstant = 
completedTimeline.lastInstant();
             if (lastInstant.isPresent()
                 && 
HoodieTimeline.compareTimestamps(partitionCreationTimeOpt.get(), GREATER_THAN, 
lastInstant.get().getTimestamp())) {
               LOG.warn("Ignoring additional partition " + partitionFromDMT + 
", as it was deduced to be part of a "
-                  + "latest completed commit which was inflighht when FS based 
listing was polled.");
+                  + "latest completed commit which was inflight when FS based 
listing was polled.");
               actualAdditionalPartitionsInMDT.remove(partitionFromDMT);
             }
           }
@@ -689,10 +664,53 @@ public class HoodieMetadataTableValidator implements 
Serializable {
         throw new HoodieValidationException(message);
       }
     }
-
     return allPartitionPathsMeta;
   }
 
+  @VisibleForTesting
+  Option<String> getPartitionCreationInstant(FileSystem fs, String basePath, 
String partition) {
+    HoodiePartitionMetadata hoodiePartitionMetadata =
+        new HoodiePartitionMetadata(fs, FSUtils.getPartitionPath(basePath, 
partition));
+    return hoodiePartitionMetadata.readPartitionCreatedCommitTime();
+  }
+
+  @VisibleForTesting
+   List<String> getPartitionsFromMDT(HoodieEngineContext engineContext, String 
basePath) {
+    return FSUtils.getAllPartitionPaths(engineContext, basePath, true);
+  }
+
+  @VisibleForTesting
+  List<String> getPartitionsFromFileSystem(HoodieEngineContext engineContext, 
String basePath,
+                                           FileSystem fs, HoodieTimeline 
completedTimeline) {
+    List<String> allPartitionPathsFromFS = 
FSUtils.getAllPartitionPaths(engineContext, basePath, false);
+
+    // ignore partitions created by uncommitted ingestion.
+    return allPartitionPathsFromFS.stream().parallel().filter(part -> {
+      HoodiePartitionMetadata hoodiePartitionMetadata =
+          new HoodiePartitionMetadata(fs, FSUtils.getPartitionPath(basePath, 
part));
+      Option<String> instantOption = 
hoodiePartitionMetadata.readPartitionCreatedCommitTime();
+      if (instantOption.isPresent()) {
+        String instantTime = instantOption.get();
+        // There are two cases where the created commit time is written to the 
partition metadata:
+        // (1) Commit C1 creates the partition and C1 succeeds, the partition 
metadata has C1 as
+        // the created commit time.
+        // (2) Commit C1 creates the partition, the partition metadata is 
written, and C1 fails
+        // during writing data files.  Next time, C2 adds new data to the same 
partition after C1
+        // is rolled back. In this case, the partition metadata still has C1 
as the created commit
+        // time, since Hudi does not rewrite the partition metadata in C2.
+        if (!completedTimeline.containsOrBeforeTimelineStarts(instantTime)) {
+          Option<HoodieInstant> lastInstant = completedTimeline.lastInstant();
+          return lastInstant.isPresent()
+              && HoodieTimeline.compareTimestamps(
+              instantTime, LESSER_THAN_OR_EQUALS, 
lastInstant.get().getTimestamp());
+        }
+        return true;
+      } else {
+        return false;
+      }
+    }).collect(Collectors.toList());
+  }
+
   /**
    * Compare the file listing and index data between metadata table and 
fileSystem.
    * For now, validate five kinds of apis:
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
index e87f6257c54..131a96329f0 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
@@ -19,16 +19,32 @@
 package org.apache.hudi.utilities;
 
 import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.config.HoodieTimeGeneratorConfig;
+import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.TimeGenerator;
+import org.apache.hudi.common.table.timeline.TimeGenerators;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieValidationException;
 import org.apache.hudi.testutils.HoodieSparkClientTestBase;
 
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SaveMode;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -36,14 +52,18 @@ import java.util.stream.Collectors;
 
 import static 
org.apache.hudi.common.testutils.RawTripTestPayload.recordToString;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class TestHoodieMetadataTableValidator extends 
HoodieSparkClientTestBase {
 
   @Test
   public void testMetadataTableValidation() {
 
-    Map<String,String> writeOptions = new HashMap<>();
+    Map<String, String> writeOptions = new HashMap<>();
     writeOptions.put(DataSourceWriteOptions.TABLE_NAME().key(), "test_table");
     writeOptions.put("hoodie.table.name", "test_table");
     writeOptions.put(DataSourceWriteOptions.TABLE_TYPE().key(), 
"MERGE_ON_READ");
@@ -73,6 +93,115 @@ public class TestHoodieMetadataTableValidator extends 
HoodieSparkClientTestBase
     assertTrue(validator.getThrowables().isEmpty());
   }
 
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testAdditionalPartitionsinMDT(boolean testFailureCase) throws 
InterruptedException {
+    Map<String, String> writeOptions = new HashMap<>();
+    writeOptions.put(DataSourceWriteOptions.TABLE_NAME().key(), "test_table");
+    writeOptions.put("hoodie.table.name", "test_table");
+    writeOptions.put(DataSourceWriteOptions.TABLE_TYPE().key(), 
"MERGE_ON_READ");
+    writeOptions.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), 
"_row_key");
+    writeOptions.put(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), 
"timestamp");
+    writeOptions.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), 
"partition_path");
+
+    // constructor of HoodieMetadataValidator instantiates 
HoodieTableMetaClient. hence creating an actual table. but rest of tests is 
mocked.
+    Dataset<Row> inserts = makeInsertDf("000", 5).cache();
+    inserts.write().format("hudi").options(writeOptions)
+        .option(DataSourceWriteOptions.OPERATION().key(), 
WriteOperationType.BULK_INSERT.value())
+        .mode(SaveMode.Overwrite)
+        .save(basePath);
+
+    HoodieMetadataTableValidator.Config config = new 
HoodieMetadataTableValidator.Config();
+    config.basePath = basePath;
+    config.validateLatestFileSlices = true;
+    config.validateAllFileGroups = true;
+    MockHoodieMetadataTableValidator validator = new 
MockHoodieMetadataTableValidator(jsc, config);
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+    HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class);
+
+    String partition1 = "PARTITION1";
+    String partition2 = "PARTITION2";
+    String partition3 = "PARTITION3";
+
+    // mock list of partitions to return from MDT to have 1 additional 
partition compared to FS based listing.
+    List<String> mdtPartitions = Arrays.asList(partition1, partition2, 
partition3);
+    validator.setMetadataPartitionsToReturn(mdtPartitions);
+    List<String> fsPartitions = Arrays.asList(partition1, partition2);
+    validator.setFsPartitionsToReturn(fsPartitions);
+
+    // mock completed timeline.
+    HoodieTimeline commitsTimeline = mock(HoodieTimeline.class);
+    HoodieTimeline completedTimeline = mock(HoodieTimeline.class);
+    when(metaClient.getCommitsTimeline()).thenReturn(commitsTimeline);
+    
when(commitsTimeline.filterCompletedInstants()).thenReturn(completedTimeline);
+
+    TimeGenerator timeGenerator = TimeGenerators
+        .getTimeGenerator(HoodieTimeGeneratorConfig.defaultConfig(basePath), 
jsc.hadoopConfiguration());
+
+    if (testFailureCase) {
+      // 3rd partition which is additional in MDT should have creation time 
before last instant in timeline.
+
+      String partition3CreationTime = 
HoodieActiveTimeline.createNewInstantTime(true, timeGenerator);
+      Thread.sleep(100);
+      String lastIntantCreationTime = 
HoodieActiveTimeline.createNewInstantTime(true, timeGenerator);
+
+      HoodieInstant lastInstant = new 
HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, 
lastIntantCreationTime);
+      when(completedTimeline.lastInstant()).thenReturn(Option.of(lastInstant));
+      validator.setPartitionCreationTime(Option.of(partition3CreationTime));
+      // validate that exception is thrown since MDT has one additional 
partition.
+      assertThrows(HoodieValidationException.class, () -> {
+        validator.validatePartitions(engineContext, basePath, metaClient);
+      });
+    } else {
+      // 3rd partition creation time is > last completed instant
+      HoodieInstant lastInstant = new 
HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, 
HoodieActiveTimeline.createNewInstantTime(true, timeGenerator));
+      when(completedTimeline.lastInstant()).thenReturn(Option.of(lastInstant));
+      Thread.sleep(100);
+      
validator.setPartitionCreationTime(Option.of(HoodieActiveTimeline.createNewInstantTime(true,
 timeGenerator)));
+
+      // validate that all 3 partitions are returned
+      assertEquals(mdtPartitions, validator.validatePartitions(engineContext, 
basePath, metaClient));
+    }
+  }
+
+  class MockHoodieMetadataTableValidator extends HoodieMetadataTableValidator {
+
+    private List<String> metadataPartitionsToReturn;
+    private List<String> fsPartitionsToReturn;
+    private Option<String> partitionCreationTime;
+
+    public MockHoodieMetadataTableValidator(JavaSparkContext jsc, Config cfg) {
+      super(jsc, cfg);
+    }
+
+    void setMetadataPartitionsToReturn(List<String> 
metadataPartitionsToReturn) {
+      this.metadataPartitionsToReturn = metadataPartitionsToReturn;
+    }
+
+    void setFsPartitionsToReturn(List<String> fsPartitionsToReturn) {
+      this.fsPartitionsToReturn = fsPartitionsToReturn;
+    }
+
+    void setPartitionCreationTime(Option<String> partitionCreationTime) {
+      this.partitionCreationTime = partitionCreationTime;
+    }
+
+    @Override
+    List<String> getPartitionsFromFileSystem(HoodieEngineContext 
engineContext, String basePath, FileSystem fs, HoodieTimeline 
completedTimeline) {
+      return fsPartitionsToReturn;
+    }
+
+    @Override
+    List<String> getPartitionsFromMDT(HoodieEngineContext engineContext, 
String basePath) {
+      return metadataPartitionsToReturn;
+    }
+
+    @Override
+    Option<String> getPartitionCreationInstant(FileSystem fs, String basePath, 
String partition) {
+      return this.partitionCreationTime;
+    }
+  }
+
   protected Dataset<Row> makeInsertDf(String instantTime, Integer n) {
     List<String> records = dataGen.generateInserts(instantTime, n).stream()
         .map(r -> recordToString(r).get()).collect(Collectors.toList());

Reply via email to