This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 773b385ba6a [HUDI-8337] Fixing failure handling with 
HoodieMetadataTableValidator (#12247)
773b385ba6a is described below

commit 773b385ba6a01e4909252adaf8b994681e1021ae
Author: Lin Liu <[email protected]>
AuthorDate: Wed Nov 20 18:46:18 2024 -0800

    [HUDI-8337] Fixing failure handling with HoodieMetadataTableValidator 
(#12247)
    
    
    ---------
    
    Co-authored-by: Sivabalan Narayanan <[email protected]>
---
 .../utilities/HoodieMetadataTableValidator.java    |  80 ++++--
 .../TestHoodieMetadataTableValidator.java          | 275 ++++++++++++++++-----
 2 files changed, 276 insertions(+), 79 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 5bdfa31821c..63079544886 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -55,6 +55,7 @@ import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.InstantComparison;
 import org.apache.hudi.common.table.view.FileSystemViewManager;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.table.view.FileSystemViewStorageType;
@@ -90,6 +91,7 @@ import com.beust.jcommander.Parameter;
 import org.apache.avro.Schema;
 import org.apache.hadoop.fs.Path;
 import org.apache.spark.SparkConf;
+import org.apache.spark.SparkException;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -118,6 +120,7 @@ import java.util.TreeSet;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 
 import scala.Tuple2;
@@ -506,6 +509,11 @@ public class HoodieMetadataTableValidator implements 
Serializable {
         LOG.info(" ****** do hoodie metadata table validation once - {} 
******", taskLabels);
         result = doHoodieMetadataTableValidationOnce();
       }
+      return result;
+    } catch (HoodieValidationException ve) {
+      if (!cfg.ignoreFailed) {
+        throw ve;
+      }
     } catch (Exception e) {
       throw new HoodieException("Unable to do hoodie metadata table validation 
in " + cfg.basePath, e);
     } finally {
@@ -513,8 +521,8 @@ public class HoodieMetadataTableValidator implements 
Serializable {
       if (asyncMetadataTableValidateService.isPresent()) {
         asyncMetadataTableValidateService.get().shutdown(true);
       }
-      return result;
     }
+    return result;
   }
 
   private boolean doHoodieMetadataTableValidationOnce() {
@@ -580,7 +588,6 @@ public class HoodieMetadataTableValidator implements 
Serializable {
     // compare partitions
 
     List<String> allPartitions = validatePartitions(engineContext, basePath, 
metaClient);
-
     if (allPartitions.isEmpty()) {
       LOG.warn("The result of getting all partitions is null or empty, skip 
current validation. {}", taskLabels);
       return true;
@@ -650,8 +657,14 @@ public class HoodieMetadataTableValidator implements 
Serializable {
         LOG.warn("Metadata table validation failed ({}).", taskLabels);
         return false;
       }
-    } catch (HoodieValidationException e) {
-      throw e;
+    } catch (HoodieValidationException validationException) {
+      throw validationException;
+    } catch (SparkException sparkException) {
+      if (sparkException.getCause() instanceof HoodieValidationException) {
+        throw (HoodieValidationException) sparkException.getCause();
+      } else {
+        throw new HoodieValidationException("Unexpected spark failure", 
sparkException);
+      }
     } catch (Exception e) {
       LOG.warn("Error closing HoodieMetadataValidationContext, "
           + "ignoring the error as the validation is successful.", e);
@@ -719,30 +732,36 @@ public class HoodieMetadataTableValidator implements 
Serializable {
       additionalFromFS.removeAll(allPartitionPathsMeta);
       List<String> additionalFromMDT = new ArrayList<>(allPartitionPathsMeta);
       additionalFromMDT.removeAll(allPartitionPathsFromFS);
-      boolean misMatch = true;
+      AtomicBoolean misMatch = new AtomicBoolean(true);
       List<String> actualAdditionalPartitionsInMDT = new 
ArrayList<>(additionalFromMDT);
       if (additionalFromFS.isEmpty() && !additionalFromMDT.isEmpty()) {
         // there is a chance that when we polled MDT there could have been a 
new completed commit which was not complete when we polled FS based
         // listing. let's rule that out.
-        additionalFromMDT.forEach(partitionFromDMT -> {
-          Option<String> partitionCreationTimeOpt = 
getPartitionCreationInstant(metaClient.getStorage(), basePath, 
partitionFromDMT);
-          // if creation time is greater than last completed instant in active 
timeline, we can ignore the additional partition from MDT.
-          if (partitionCreationTimeOpt.isPresent() && 
!completedTimeline.containsInstant(partitionCreationTimeOpt.get())) {
-            Option<HoodieInstant> lastInstant = 
completedTimeline.lastInstant();
-            if (lastInstant.isPresent()
-                && compareTimestamps(partitionCreationTimeOpt.get(), 
GREATER_THAN, lastInstant.get().requestedTime())) {
-              LOG.warn("Ignoring additional partition {}, as it was deduced to 
be part of a "
-                  + "latest completed commit which was inflight when FS based 
listing was polled.", partitionFromDMT);
-              actualAdditionalPartitionsInMDT.remove(partitionFromDMT);
+        additionalFromMDT.forEach(partitionFromMDT -> {
+          try {
+            if (metaClient.getStorage().exists(new StoragePath(basePath + "/" 
+ partitionFromMDT))) {
+              Option<String> partitionCreationTimeOpt = 
getPartitionCreationInstant(metaClient.getStorage(), basePath, 
partitionFromMDT);
+              // if creation time is greater than last completed instant in 
active timeline, we can ignore the additional partition from MDT.
+              if (partitionCreationTimeOpt.isPresent() && 
!completedTimeline.containsInstant(partitionCreationTimeOpt.get())) {
+                Option<HoodieInstant> lastInstant = 
completedTimeline.lastInstant();
+                if (lastInstant.isPresent()
+                    && 
InstantComparison.compareTimestamps(partitionCreationTimeOpt.get(), 
GREATER_THAN, lastInstant.get().requestedTime())) {
+                  LOG.warn("Ignoring additional partition " + partitionFromMDT 
+ ", as it was deduced to be part of a "
+                      + "latest completed commit which was inflight when FS 
based listing was polled.");
+                  actualAdditionalPartitionsInMDT.remove(partitionFromMDT);
+                }
+              }
             }
+          } catch (IOException e) {
+            throw new HoodieValidationException("IOException thrown while 
trying to validate partition match b/w FS based listing and MDT based listing", 
e);
           }
         });
         // if there is no additional partitions from FS listing and only 
additional partitions from MDT based listing is due to a new commit, we are good
         if (actualAdditionalPartitionsInMDT.isEmpty()) {
-          misMatch = false;
+          misMatch.set(false);
         }
       }
-      if (misMatch) {
+      if (misMatch.get()) {
         String message = "Compare Partitions Failed! " + " Additional 
partitions from FS, but missing from MDT : \"" + additionalFromFS
             + "\" and additional partitions from MDT, but missing from FS 
listing : \"" + actualAdditionalPartitionsInMDT
             + "\".\n All partitions from FS listing " + 
allPartitionPathsFromFS;
@@ -979,7 +998,8 @@ public class HoodieMetadataTableValidator implements 
Serializable {
         
AvroConversionUtils.convertAvroSchemaToStructType(metadataTableBasedContext.getSchema()),
 metadataTableBasedContext.getMetadataConfig(),
         metaClient, false);
     HoodieData<HoodieMetadataColumnStats> partitionStats =
-        
partitionStatsIndexSupport.loadColumnStatsIndexRecords(JavaConverters.asScalaBufferConverter(metadataTableBasedContext.allColumnNameList).asScala().toSeq(),
 scala.Option.empty(), false)
+        
partitionStatsIndexSupport.loadColumnStatsIndexRecords(JavaConverters.asScalaBufferConverter(
+            metadataTableBasedContext.allColumnNameList).asScala().toSeq(), 
scala.Option.empty(), false)
             // set isTightBound to false since partition stats generated using 
column stats does not contain the field
             .map(colStat -> 
HoodieMetadataColumnStats.newBuilder(colStat).setIsTightBound(false).build());
     JavaRDD<HoodieMetadataColumnStats> diffRDD = 
HoodieJavaRDD.getJavaRDD(partitionStats).subtract(HoodieJavaRDD.getJavaRDD(partitionStatsUsingColStats));
@@ -1068,7 +1088,8 @@ public class HoodieMetadataTableValidator implements 
Serializable {
     validate(metadataBasedBloomFilters, fsBasedBloomFilters, partitionPath, 
"bloom filters");
   }
 
-  private void validateRecordIndex(HoodieSparkEngineContext sparkEngineContext,
+  @VisibleForTesting
+  void validateRecordIndex(HoodieSparkEngineContext sparkEngineContext,
                                    HoodieTableMetaClient metaClient) {
     if 
(!metaClient.getTableConfig().isMetadataPartitionAvailable(MetadataPartitionType.RECORD_INDEX))
 {
       return;
@@ -1385,8 +1406,11 @@ public class HoodieMetadataTableValidator implements 
Serializable {
         FileSlice fileSlice1 = fileSliceListFromMetadataTable.get(i);
         FileSlice fileSlice2 = fileSliceListFromFS.get(i);
         if (!Objects.equals(fileSlice1.getFileGroupId(), 
fileSlice2.getFileGroupId())
-            || !Objects.equals(fileSlice1.getBaseInstantTime(), 
fileSlice2.getBaseInstantTime())
-            || !Objects.equals(fileSlice1.getBaseFile(), 
fileSlice2.getBaseFile())) {
+            || !Objects.equals(fileSlice1.getBaseInstantTime(), 
fileSlice2.getBaseInstantTime())) {
+          mismatch = true;
+          break;
+        }
+        if (!assertBaseFilesEquality(fileSlice1, fileSlice2)) {
           mismatch = true;
           break;
         }
@@ -1412,6 +1436,20 @@ public class HoodieMetadataTableValidator implements 
Serializable {
     }
   }
 
+  private boolean assertBaseFilesEquality(FileSlice fileSlice1, FileSlice 
fileSlice2) {
+    if (fileSlice1.getBaseFile().isPresent() && 
fileSlice2.getBaseFile().isPresent()) {
+      HoodieBaseFile baseFile1 = fileSlice1.getBaseFile().get();
+      HoodieBaseFile baseFile2 = fileSlice2.getBaseFile().get();
+      return baseFile1.getFileName().equals(baseFile2.getFileName()) && 
baseFile1.getFileId().equals(baseFile2.getFileId())
+          && baseFile1.getFileSize() == baseFile2.getFileSize();
+    } else {
+      if (!fileSlice1.getBaseFile().isPresent() == 
fileSlice2.getBaseFile().isPresent()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
   /**
    * Compares committed log files from two file slices.
    *
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
index cfd40f90a50..166e4e8c5c0 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
@@ -24,20 +24,23 @@ import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.HoodieTimeGeneratorConfig;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieColumnRangeMetadata;
-import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.HoodieFileGroup;
+import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimeGenerator;
 import org.apache.hudi.common.table.timeline.TimeGenerators;
 import org.apache.hudi.common.table.timeline.TimelineUtils;
 import org.apache.hudi.common.table.view.FileSystemViewStorageType;
+import org.apache.hudi.common.table.view.HoodieTableFileSystemView;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.SerializationUtils;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.exception.HoodieException;
@@ -47,6 +50,7 @@ import org.apache.hudi.hadoop.fs.HadoopFSUtils;
 import org.apache.hudi.storage.HoodieStorage;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.storage.StoragePathInfo;
+import org.apache.hudi.storage.hadoop.HoodieHadoopStorage;
 import org.apache.hudi.testutils.HoodieSparkClientTestBase;
 
 import jodd.io.FileUtil;
@@ -82,10 +86,8 @@ import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.apache.hadoop.fs.FileUtil.copy;
-import static 
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeCommitMetadata;
-import static 
org.apache.hudi.common.testutils.HoodieTestUtils.COMMIT_METADATA_SER_DE;
-import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_FILE_NAME_GENERATOR;
+import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
 import static 
org.apache.hudi.common.testutils.RawTripTestPayload.recordToString;
 import static org.apache.spark.sql.types.DataTypes.IntegerType;
 import static org.apache.spark.sql.types.DataTypes.StringType;
@@ -99,13 +101,7 @@ import static org.mockito.Mockito.when;
 public class TestHoodieMetadataTableValidator extends 
HoodieSparkClientTestBase {
 
   private static Stream<Arguments> lastNFileSlicesTestArgs() {
-    return Stream.of(
-        Arguments.of(-1),
-        Arguments.of(1),
-        Arguments.of(3),
-        Arguments.of(4),
-        Arguments.of(5)
-    );
+    return Stream.of(-1, 1, 3, 4, 5).flatMap(i -> Stream.of(Arguments.of(i, 
true), Arguments.of(i, false)));
   }
 
   private static Stream<Arguments> viewStorageArgs() {
@@ -200,7 +196,7 @@ public class TestHoodieMetadataTableValidator extends 
HoodieSparkClientTestBase
 
     // validate MDT
     HoodieMetadataTableValidator.Config config = new 
HoodieMetadataTableValidator.Config();
-    config.basePath = basePath;
+    config.basePath = "file:" + basePath;
     config.validateLatestFileSlices = true;
     config.validateAllFileGroups = true;
     if (viewStorageTypeForFSListing != null && viewStorageTypeForMDTListing != 
null) {
@@ -474,7 +470,7 @@ public class TestHoodieMetadataTableValidator extends 
HoodieSparkClientTestBase
 
   @ParameterizedTest
   @ValueSource(booleans = {true, false})
-  public void testAdditionalPartitionsinMDT(boolean testFailureCase) throws 
InterruptedException {
+  public void testAdditionalPartitionsinMDT(boolean testFailureCase) throws 
IOException, InterruptedException {
     Map<String, String> writeOptions = new HashMap<>();
     writeOptions.put(DataSourceWriteOptions.TABLE_NAME().key(), "test_table");
     writeOptions.put(DataSourceWriteOptions.TABLE_TYPE().key(), 
"MERGE_ON_READ");
@@ -489,6 +485,10 @@ public class TestHoodieMetadataTableValidator extends 
HoodieSparkClientTestBase
         .mode(SaveMode.Overwrite)
         .save(basePath);
 
+    String partition1 = "PARTITION1";
+    String partition2 = "PARTITION2";
+    String partition3 = "PARTITION3";
+
     HoodieMetadataTableValidator.Config config = new 
HoodieMetadataTableValidator.Config();
     config.basePath = basePath;
     config.validateLatestFileSlices = true;
@@ -496,10 +496,11 @@ public class TestHoodieMetadataTableValidator extends 
HoodieSparkClientTestBase
     MockHoodieMetadataTableValidator validator = new 
MockHoodieMetadataTableValidator(jsc, config);
     HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
     HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class);
-
-    String partition1 = "PARTITION1";
-    String partition2 = "PARTITION2";
-    String partition3 = "PARTITION3";
+    HoodieStorage storage = mock(HoodieHadoopStorage.class);
+    when(metaClient.getStorage()).thenReturn(storage);
+    when(storage.exists(new StoragePath(basePath + "/" + 
partition1))).thenReturn(true);
+    when(storage.exists(new StoragePath(basePath + "/" + 
partition2))).thenReturn(true);
+    when(storage.exists(new StoragePath(basePath + "/" + 
partition3))).thenReturn(true);
 
     // mock list of partitions to return from MDT to have 1 additional 
partition compared to FS based listing.
     List<String> mdtPartitions = Arrays.asList(partition1, partition2, 
partition3);
@@ -548,44 +549,20 @@ public class TestHoodieMetadataTableValidator extends 
HoodieSparkClientTestBase
 
   @ParameterizedTest
   @MethodSource("lastNFileSlicesTestArgs")
-  public void testAdditionalFilesinMetadata(Integer lastNFileSlices) throws 
Exception {
+  public void testAdditionalFilesInMetadata(Integer lastNFileSlices, boolean 
ignoreFailed) throws IOException {
     Map<String, String> writeOptions = new HashMap<>();
     writeOptions.put(DataSourceWriteOptions.TABLE_NAME().key(), "test_table");
     writeOptions.put(DataSourceWriteOptions.TABLE_TYPE().key(), 
"MERGE_ON_READ");
     writeOptions.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), 
"_row_key");
     writeOptions.put(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), 
"timestamp");
-    
writeOptions.put(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(),"2");
+    
writeOptions.put(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), 
"2");
 
     Dataset<Row> inserts = makeInsertDf("000", 10).cache();
     inserts.write().format("hudi").options(writeOptions)
         .mode(SaveMode.Overwrite)
         .save(basePath);
 
-    // Perform updates to generate log files
-    inserts.write().format("hudi").options(writeOptions)
-        .mode(SaveMode.Append)
-        .save(basePath);
-    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
-    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(engineContext.getStorageConf()).build();
-
-    // let's add a log file entry to the commit history and filesystem by 
directly modifying the commit so FS based listing and MDT based listing 
diverges.
-    HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
-    HoodieInstant instantToOverwrite = timeline.getInstants().get(1);
-    HoodieCommitMetadata commitMetadata = 
COMMIT_METADATA_SER_DE.deserialize(instantToOverwrite, 
timeline.getInstantDetails(instantToOverwrite).get(), 
HoodieCommitMetadata.class);
-    HoodieWriteStat writeStatToCopy = 
commitMetadata.getPartitionToWriteStats().entrySet().stream().flatMap(entry -> 
entry.getValue().stream())
-        .filter(writeStat -> 
FSUtils.isLogFile(writeStat.getPath())).findFirst().get();
-    String newLogFilePath = writeStatToCopy.getPath() + "1";
-    HoodieWriteStat writeStatCopy = 
SerializationUtils.deserialize(SerializationUtils.serialize(writeStatToCopy));
-    writeStatCopy.setPath(newLogFilePath);
-    commitMetadata.addWriteStat(writeStatCopy.getPartitionPath(), 
writeStatCopy);
-    FileSystem fs = HadoopFSUtils.getFs(newLogFilePath, new 
Configuration(false));
-    fs.copyFromLocalFile(new Path(basePath, writeStatToCopy.getPath()), new 
Path(basePath, newLogFilePath));
-    // remove the existing instant and rewrite with the new metadata
-    assertTrue(fs.delete(new Path(basePath, String.format(".hoodie/%s", 
INSTANT_FILE_NAME_GENERATOR.getFileName(instantToOverwrite)))));
-    
timeline.saveAsComplete(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT,
 instantToOverwrite.getAction(), instantToOverwrite.requestedTime(),
-            instantToOverwrite.getCompletionTime()), 
serializeCommitMetadata(COMMIT_METADATA_SER_DE, commitMetadata));
-
-    for (int i = 0; i < 5; i++) {
+    for (int i = 0; i < 6; i++) {
       inserts.write().format("hudi").options(writeOptions)
           .mode(SaveMode.Append)
           .save(basePath);
@@ -595,15 +572,79 @@ public class TestHoodieMetadataTableValidator extends 
HoodieSparkClientTestBase
     config.basePath = "file:" + basePath;
     config.validateLatestFileSlices = true;
     config.validateAllFileGroups = true;
-    config.ignoreFailed = true;
+    config.ignoreFailed = ignoreFailed;
+    HoodieMetadataTableValidator validator = new 
HoodieMetadataTableValidator(jsc, config);
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
 
-    HoodieMetadataTableValidator.Config finalConfig = config;
-    HoodieMetadataTableValidator localValidator = new 
HoodieMetadataTableValidator(jsc, finalConfig);
+    validator.run();
+    // assertFalse(validator.hasValidationFailure());
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(engineContext.getStorageConf()).build();
+
+    java.nio.file.Path tempFolderNioPath = tempDir.resolve("temp_folder");
+    java.nio.file.Files.createDirectories(tempFolderNioPath);
+    String tempFolder = tempFolderNioPath.toAbsolutePath().toString();
+    Path tempFolderPath = new Path(tempFolder);
+
+    // lets move one of the log files from latest file slice to the temp dir. 
so validation w/ latest file slice should fail.
+    HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(
+        metaClient, 
metaClient.getActiveTimeline().filterCompletedAndCompactionInstants(), false);
+    FileSlice latestFileSlice = 
fsView.getLatestFileSlices(StringUtils.EMPTY_STRING).filter(fileSlice -> {
+      return fileSlice.getLogFiles().count() > 0;
+    }).collect(Collectors.toList()).get(0);
+    HoodieLogFile latestLogFile = 
latestFileSlice.getLogFiles().collect(Collectors.toList()).get(0);
+    FileSystem fs = HadoopFSUtils.getFs(new 
Path(latestLogFile.getPath().toString()), new Configuration(false));
+    fs.moveFromLocalFile(new Path(latestLogFile.getPath().toString()), 
tempFolderPath);
+
+    config = new HoodieMetadataTableValidator.Config();
+    config.basePath = "file:" + basePath;
+    config.validateLatestFileSlices = true;
+    config.ignoreFailed = ignoreFailed;
+
+    HoodieMetadataTableValidator localValidator = new 
HoodieMetadataTableValidator(jsc, config);
+    if (ignoreFailed) {
+      localValidator.run();
+      assertTrue(localValidator.hasValidationFailure());
+      assertTrue(localValidator.getThrowables().get(0) instanceof 
HoodieValidationException);
+    } else {
+      assertThrows(HoodieValidationException.class, localValidator::run);
+    }
+
+    // lets move back the log file and validate validation suceeds.
+    fs.moveFromLocalFile(new Path(tempFolderPath + "/" + 
latestLogFile.getFileName()), new Path(basePath));
+    config = new HoodieMetadataTableValidator.Config();
+    config.basePath = "file:" + basePath;
+    config.validateLatestFileSlices = true;
+    config.ignoreFailed = ignoreFailed;
+
+    localValidator = new HoodieMetadataTableValidator(jsc, config);
     localValidator.run();
-    assertTrue(localValidator.hasValidationFailure());
-    assertTrue(localValidator.getThrowables().get(0) instanceof 
HoodieValidationException);
+    // no exception should be thrown
+
+    // let's delete one of the log files from 1st commit and so FS based 
listing and MDT based listing diverges when all file slices are validated.
+    fsView = new HoodieTableFileSystemView(
+        metaClient, 
metaClient.getActiveTimeline().filterCompletedAndCompactionInstants(), false);
+    HoodieFileGroup fileGroup = 
fsView.getAllFileGroups(StringUtils.EMPTY_STRING).collect(Collectors.toList()).get(0);
+    List<FileSlice> allFileSlices = 
fileGroup.getAllFileSlices().collect(Collectors.toList());
+    FileSlice earliestFileSlice = allFileSlices.get(allFileSlices.size() - 1);
+    HoodieLogFile earliestLogFile = 
earliestFileSlice.getLogFiles().collect(Collectors.toList()).get(0);
+    fs.delete(new Path(earliestLogFile.getPath().toString()));
 
-    // lets set lastN file slices to 2 and so validation should succeed. 
(bcoz, there will be mis-match only on first file slice)
+    config = new HoodieMetadataTableValidator.Config();
+    config.basePath = "file:" + basePath;
+    config.validateLatestFileSlices = true;
+    config.validateAllFileGroups = true;
+    config.ignoreFailed = ignoreFailed;
+    HoodieMetadataTableValidator.Config finalConfig = config;
+    localValidator = new HoodieMetadataTableValidator(jsc, finalConfig);
+    if (ignoreFailed) {
+      localValidator.run();
+      assertTrue(localValidator.hasValidationFailure());
+      assertTrue(localValidator.getThrowables().get(0) instanceof 
HoodieValidationException);
+    } else {
+      assertThrows(HoodieValidationException.class, localValidator::run);
+    }
+
+    // lets set lastN file slices to argument value and so validation should 
succeed. (bcoz, there will be mis-match only on first file slice)
     config = new HoodieMetadataTableValidator.Config();
     config.basePath = "file:" + basePath;
     config.validateLatestFileSlices = true;
@@ -611,14 +652,132 @@ public class TestHoodieMetadataTableValidator extends 
HoodieSparkClientTestBase
     if (lastNFileSlices != -1) {
       config.validateLastNFileSlices = lastNFileSlices;
     }
-    config.ignoreFailed = true;
-    HoodieMetadataTableValidator validator = new 
HoodieMetadataTableValidator(jsc, config);
-    validator.run();
+    config.ignoreFailed = ignoreFailed;
+    validator = new HoodieMetadataTableValidator(jsc, config);
     if (lastNFileSlices != -1 && lastNFileSlices < 4) {
+      validator.run();
       assertFalse(validator.hasValidationFailure());
     } else {
-      assertTrue(validator.hasValidationFailure());
-      assertTrue(validator.getThrowables().get(0) instanceof 
HoodieValidationException);
+      if (ignoreFailed) {
+        validator.run();
+        assertTrue(validator.hasValidationFailure());
+        assertTrue(validator.getThrowables().get(0) instanceof 
HoodieValidationException);
+      } else {
+        assertThrows(HoodieValidationException.class, validator::run);
+      }
+    }
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testAdditionalPartitionsinMdtEndToEnd(boolean ignoreFailed) 
throws IOException {
+    Map<String, String> writeOptions = new HashMap<>();
+    writeOptions.put(DataSourceWriteOptions.TABLE_NAME().key(), "test_table");
+    writeOptions.put("hoodie.table.name", "test_table");
+    writeOptions.put(DataSourceWriteOptions.TABLE_TYPE().key(), 
"MERGE_ON_READ");
+    writeOptions.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), 
"_row_key");
+    writeOptions.put(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), 
"timestamp");
+    
writeOptions.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(),"partition_path");
+    
writeOptions.put(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), 
"2");
+
+    Dataset<Row> inserts = makeInsertDf("000", 100).cache();
+    inserts.write().format("hudi").options(writeOptions)
+        .mode(SaveMode.Overwrite)
+        .save(basePath);
+
+    for (int i = 0; i < 6; i++) {
+      inserts.write().format("hudi").options(writeOptions)
+          .mode(SaveMode.Append)
+          .save(basePath);
+    }
+
+    HoodieMetadataTableValidator.Config config = new 
HoodieMetadataTableValidator.Config();
+    config.basePath = "file:" + basePath;
+    config.validateLatestFileSlices = true;
+    config.validateAllFileGroups = true;
+    config.ignoreFailed = ignoreFailed;
+    HoodieMetadataTableValidator validator = new 
HoodieMetadataTableValidator(jsc, config);
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+
+    validator.run();
+    assertFalse(validator.hasValidationFailure());
+
+    // let's delete one of the partitions, so validation fails
+    FileSystem fs = HadoopFSUtils.getFs(basePath, new Configuration(false));
+    fs.delete(new Path(basePath + "/" + 
HoodieTestDataGenerator.DEFAULT_SECOND_PARTITION_PATH));
+
+    config = new HoodieMetadataTableValidator.Config();
+    config.basePath = "file:" + basePath;
+    config.validateLatestFileSlices = true;
+    config.ignoreFailed = ignoreFailed;
+
+    HoodieMetadataTableValidator localValidator = new 
HoodieMetadataTableValidator(jsc, config);
+    if (ignoreFailed) {
+      localValidator.run();
+      assertTrue(localValidator.hasValidationFailure());
+      assertTrue(localValidator.getThrowables().get(0) instanceof 
HoodieValidationException);
+    } else {
+      assertThrows(HoodieValidationException.class, localValidator::run);
+    }
+  }
+
+  @ParameterizedTest
+  @ValueSource(booleans = {true, false})
+  public void testRecordIndexMismatch(boolean ignoreFailed) throws IOException 
{
+    Map<String, String> writeOptions = new HashMap<>();
+    writeOptions.put(DataSourceWriteOptions.TABLE_NAME().key(), "test_table");
+    writeOptions.put("hoodie.table.name", "test_table");
+    writeOptions.put(DataSourceWriteOptions.TABLE_TYPE().key(), 
"COPY_ON_WRITE");
+    writeOptions.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), 
"_row_key");
+    writeOptions.put(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), 
"timestamp");
+    writeOptions.put(DataSourceWriteOptions.OPERATION().key(),"bulk_insert");
+    writeOptions.put(HoodieMetadataConfig.RECORD_INDEX_ENABLE_PROP.key(), 
"true");
+
+    Dataset<Row> inserts = makeInsertDf("000", 50).cache();
+    inserts.write().format("hudi").options(writeOptions)
+        .mode(SaveMode.Overwrite)
+        .save(basePath);
+
+    for (int i = 0; i < 6; i++) {
+      makeInsertDf("000", (i + 1) * 
100).write().format("hudi").options(writeOptions)
+          .mode(SaveMode.Append)
+          .save(basePath);
+    }
+
+    HoodieMetadataTableValidator.Config config = new 
HoodieMetadataTableValidator.Config();
+    config.basePath = "file:" + basePath;
+    config.validateLatestFileSlices = true;
+    config.ignoreFailed = ignoreFailed;
+    HoodieMetadataTableValidator validator = new 
HoodieMetadataTableValidator(jsc, config);
+    HoodieSparkEngineContext engineContext = new HoodieSparkEngineContext(jsc);
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setBasePath(basePath).setConf(engineContext.getStorageConf()).build();
+
+    validator.run();
+    assertFalse(validator.hasValidationFailure());
+
+    // lets override one of the latest base file w/ another. so that file 
slice validation succeeds, but record index comparison fails.
+    HoodieTableFileSystemView fsView = new HoodieTableFileSystemView(
+        metaClient, 
metaClient.getActiveTimeline().filterCompletedAndCompactionInstants(), false);
+    List<HoodieBaseFile> allBaseFiles = 
fsView.getLatestBaseFiles(StringUtils.EMPTY_STRING).collect(Collectors.toList());
+
+    FileSystem fs = HadoopFSUtils.getFs(basePath, new Configuration(false));
+    fs.copyFromLocalFile(
+        new Path(allBaseFiles.get(0).getStoragePath().toString()),
+        new Path(allBaseFiles.get(1).getStoragePath().toString()));
+
+    config = new HoodieMetadataTableValidator.Config();
+    config.basePath = "file:" + basePath;
+    config.validateLatestFileSlices = true;
+    config.validateRecordIndexContent = true;
+    config.ignoreFailed = ignoreFailed;
+
+    HoodieMetadataTableValidator localValidator = new 
HoodieMetadataTableValidator(jsc, config);
+    if (ignoreFailed) {
+      localValidator.run();
+      assertTrue(localValidator.hasValidationFailure());
+      assertTrue(localValidator.getThrowables().get(0) instanceof 
HoodieValidationException);
+    } else {
+      assertThrows(HoodieValidationException.class, localValidator::run);
     }
   }
 

Reply via email to