This is an automated email from the ASF dual-hosted git repository.

pwason pushed a commit to branch release-0.14.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit b10f52d85d3aac562141e92a01749dad7ada5e7e
Author: Y Ethan Guo <[email protected]>
AuthorDate: Tue Aug 15 09:40:43 2023 -0700

    [HUDI-6689] Add record index validation in MDT validator (#9437)
    
    This PR adds the validation of record index in MDT validator 
(`HoodieMetadataTableValidator`).  The following validation modes are added:
    - Record index count validation (with CLI config 
`--validate-record-index-count`): validate the number of entries in the record 
index, which should be equal to the number of record keys in the latest 
snapshot of the table.
    - Record index content validation (with CLI config 
`--validate-record-index-content`): validate the content of the record index so 
that each record key should have the correct location, and there is no 
additional or missing entry.  Two more configs are added for this mode: (1) 
`--num-record-index-error-samples`: number of error samples to show for record 
index validation when there are mismatches, (2) `--record-index-parallelism`: 
parallelism for joining record index entries with data [...]
---
 .../hudi/metadata/HoodieMetadataPayload.java       |  19 +-
 .../hudi/metadata/HoodieTableMetadataUtil.java     |  71 +++++-
 .../utilities/HoodieMetadataTableValidator.java    | 272 +++++++++++++++++++--
 3 files changed, 319 insertions(+), 43 deletions(-)

diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
index 8d5114a76bc..04ffc98e840 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java
@@ -158,7 +158,7 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
   /**
    * FileIndex value saved in record index record when the fileId has no index 
(old format of base filename)
    */
-  private static final int RECORD_INDEX_MISSING_FILEINDEX_FALLBACK = -1;
+  public static final int RECORD_INDEX_MISSING_FILEINDEX_FALLBACK = -1;
 
   /**
    * NOTE: PLEASE READ CAREFULLY
@@ -761,22 +761,7 @@ public class HoodieMetadataPayload implements 
HoodieRecordPayload<HoodieMetadata
    * If this is a record-level index entry, returns the file to which this is 
mapped.
    */
   public HoodieRecordGlobalLocation getRecordGlobalLocation() {
-    final String partition = recordIndexMetadata.getPartitionName();
-    String fileId = null;
-    if (recordIndexMetadata.getFileIdEncoding() == 0) {
-      // encoding 0 refers to UUID based fileID
-      final UUID uuid = new UUID(recordIndexMetadata.getFileIdHighBits(), 
recordIndexMetadata.getFileIdLowBits());
-      fileId = uuid.toString();
-      if (recordIndexMetadata.getFileIndex() != 
RECORD_INDEX_MISSING_FILEINDEX_FALLBACK) {
-        fileId += "-" + recordIndexMetadata.getFileIndex();
-      }
-    } else {
-      // encoding 1 refers to no encoding. fileID as is.
-      fileId = recordIndexMetadata.getFileId();
-    }
-
-    final java.util.Date instantDate = new 
java.util.Date(recordIndexMetadata.getInstantTime());
-    return new HoodieRecordGlobalLocation(partition, 
HoodieActiveTimeline.formatDate(instantDate), fileId);
+    return 
HoodieTableMetadataUtil.getLocationFromRecordIndexInfo(recordIndexMetadata);
   }
 
   public boolean isDeleted() {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 08fc663fbad..57f6b405628 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -18,10 +18,10 @@
 
 package org.apache.hudi.metadata;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hudi.avro.ConvertingGenericData;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieMetadataColumnStats;
+import org.apache.hudi.avro.model.HoodieRecordIndexInfo;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackPlan;
@@ -39,6 +39,7 @@ import org.apache.hudi.common.model.HoodieDeltaWriteStat;
 import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -66,6 +67,7 @@ import org.apache.avro.LogicalTypes;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
@@ -89,6 +91,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.function.BiFunction;
 import java.util.function.Function;
 import java.util.stream.Collector;
@@ -105,6 +108,7 @@ import static 
org.apache.hudi.avro.HoodieAvroUtils.unwrapAvroValueWrapper;
 import static 
org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator.MILLIS_INSTANT_ID_LENGTH;
 import static org.apache.hudi.common.util.StringUtils.isNullOrEmpty;
 import static org.apache.hudi.common.util.ValidationUtils.checkState;
+import static 
org.apache.hudi.metadata.HoodieMetadataPayload.RECORD_INDEX_MISSING_FILEINDEX_FALLBACK;
 import static 
org.apache.hudi.metadata.HoodieTableMetadata.EMPTY_PARTITION_NAME;
 import static 
org.apache.hudi.metadata.HoodieTableMetadata.NON_PARTITIONED_NAME;
 import static 
org.apache.hudi.metadata.HoodieTableMetadata.SOLO_COMMIT_TIMESTAMP;
@@ -529,8 +533,8 @@ public class HoodieTableMetadataUtil {
   }
 
   public static Map<MetadataPartitionType, HoodieData<HoodieRecord>> 
convertMissingPartitionRecords(HoodieEngineContext engineContext,
-                                                                        
List<String> deletedPartitions, Map<String, Map<String, Long>> filesAdded,
-                                                                        
Map<String, List<String>> filesDeleted, String instantTime) {
+                                                                               
                     List<String> deletedPartitions, Map<String, Map<String, 
Long>> filesAdded,
+                                                                               
                     Map<String, List<String>> filesDeleted, String 
instantTime) {
     List<HoodieRecord> records = new LinkedList<>();
     int[] fileDeleteCount = {0};
     int[] filesAddedCount = {0};
@@ -1069,8 +1073,8 @@ public class HoodieTableMetadataUtil {
   }
 
   private static Stream<HoodieRecord> 
translateWriteStatToColumnStats(HoodieWriteStat writeStat,
-                                                                     
HoodieTableMetaClient datasetMetaClient,
-                                                                     
List<String> columnsToIndex) {
+                                                                      
HoodieTableMetaClient datasetMetaClient,
+                                                                      
List<String> columnsToIndex) {
     if (writeStat instanceof HoodieDeltaWriteStat && ((HoodieDeltaWriteStat) 
writeStat).getColumnStats().isPresent()) {
       Map<String, HoodieColumnRangeMetadata<Comparable>> columnRangeMap = 
((HoodieDeltaWriteStat) writeStat).getColumnStats().get();
       Collection<HoodieColumnRangeMetadata<Comparable>> 
columnRangeMetadataList = columnRangeMap.values();
@@ -1332,7 +1336,7 @@ public class HoodieTableMetadataUtil {
    */
   public static boolean isIndexingCommit(String instantTime) {
     return instantTime.length() == MILLIS_INSTANT_ID_LENGTH + 
OperationSuffix.METADATA_INDEXER.getSuffix().length()
-            && 
instantTime.endsWith(OperationSuffix.METADATA_INDEXER.getSuffix());
+        && instantTime.endsWith(OperationSuffix.METADATA_INDEXER.getSuffix());
   }
 
   /**
@@ -1457,7 +1461,7 @@ public class HoodieTableMetadataUtil {
 
     if (backup) {
       final Path metadataPartitionBackupPath = new 
Path(metadataTablePartitionPath.getParent().getParent(),
-              String.format(".metadata_%s_%s", 
partitionType.getPartitionPath(), HoodieActiveTimeline.createNewInstantTime()));
+          String.format(".metadata_%s_%s", partitionType.getPartitionPath(), 
HoodieActiveTimeline.createNewInstantTime()));
       LOG.info(String.format("Backing up MDT partition %s to %s before 
deletion", partitionType, metadataPartitionBackupPath));
       try {
         if (fs.rename(metadataTablePartitionPath, 
metadataPartitionBackupPath)) {
@@ -1586,7 +1590,7 @@ public class HoodieTableMetadataUtil {
    * @return The estimated number of file groups.
    */
   public static int estimateFileGroupCount(MetadataPartitionType 
partitionType, long recordCount, int averageRecordSize, int minFileGroupCount,
-      int maxFileGroupCount, float growthFactor, int maxFileGroupSizeBytes) {
+                                           int maxFileGroupCount, float 
growthFactor, int maxFileGroupSizeBytes) {
     int fileGroupCount;
 
     // If a fixed number of file groups are desired
@@ -1640,4 +1644,55 @@ public class HoodieTableMetadataUtil {
     }
     return false;
   }
+
+  /**
+   * Gets the location from record index content.
+   *
+   * @param recordIndexInfo {@link HoodieRecordIndexInfo} instance.
+   * @return {@link HoodieRecordGlobalLocation} containing the location.
+   */
+  public static HoodieRecordGlobalLocation 
getLocationFromRecordIndexInfo(HoodieRecordIndexInfo recordIndexInfo) {
+    return getLocationFromRecordIndexInfo(
+        recordIndexInfo.getPartitionName(), 
recordIndexInfo.getFileIdEncoding(),
+        recordIndexInfo.getFileIdHighBits(), 
recordIndexInfo.getFileIdLowBits(),
+        recordIndexInfo.getFileIndex(), recordIndexInfo.getFileId(),
+        recordIndexInfo.getInstantTime());
+  }
+
+  /**
+   * Gets the location from record index content.
+   * Note that, a UUID based fileId is stored as 3 pieces in record index 
(fileIdHighBits,
+   * fileIdLowBits and fileIndex). FileID format is {UUID}-{fileIndex}.
+   * The arguments are consistent with what {@link HoodieRecordIndexInfo} 
contains.
+   *
+   * @param partition      The partition name the record belongs to.
+   * @param fileIdEncoding FileId encoding. Possible values are 0 and 1. O 
represents UUID based
+   *                       fileID, and 1 represents raw string format of the 
fileId.
+   * @param fileIdHighBits High 64 bits if the fileId is based on UUID format.
+   * @param fileIdLowBits  Low 64 bits if the fileId is based on UUID format.
+   * @param fileIndex      Index representing file index which is used to 
re-construct UUID based fileID.
+   * @param originalFileId FileId of the location where record belongs to.
+   *                       When the encoding is 1, fileID is stored in raw 
string format.
+   * @param instantTime    Epoch time in millisecond representing the commit 
time at which record was added.
+   * @return {@link HoodieRecordGlobalLocation} containing the location.
+   */
+  public static HoodieRecordGlobalLocation getLocationFromRecordIndexInfo(
+      String partition, int fileIdEncoding, long fileIdHighBits, long 
fileIdLowBits,
+      int fileIndex, String originalFileId, Long instantTime) {
+    String fileId = null;
+    if (fileIdEncoding == 0) {
+      // encoding 0 refers to UUID based fileID
+      final UUID uuid = new UUID(fileIdHighBits, fileIdLowBits);
+      fileId = uuid.toString();
+      if (fileIndex != RECORD_INDEX_MISSING_FILEINDEX_FALLBACK) {
+        fileId += "-" + fileIndex;
+      }
+    } else {
+      // encoding 1 refers to no encoding. fileID as is.
+      fileId = originalFileId;
+    }
+
+    final java.util.Date instantDate = new java.util.Date(instantTime);
+    return new HoodieRecordGlobalLocation(partition, 
HoodieActiveTimeline.formatDate(instantDate), fileId);
+  }
 }
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 29e59df6935..45c12fcfe28 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -36,6 +36,7 @@ import org.apache.hudi.common.model.HoodieFileGroup;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodiePartitionMetadata;
 import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
@@ -68,7 +69,10 @@ import org.apache.hadoop.fs.Path;
 import org.apache.parquet.avro.AvroSchemaConverter;
 import org.apache.parquet.schema.MessageType;
 import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.Optional;
+import org.apache.spark.sql.functions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -89,20 +93,30 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.stream.Collectors;
 
+import scala.Tuple2;
+
+import static 
org.apache.hudi.common.model.HoodieRecord.FILENAME_METADATA_FIELD;
+import static 
org.apache.hudi.common.model.HoodieRecord.PARTITION_PATH_METADATA_FIELD;
+import static 
org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIELD;
 import static 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HeaderMetadataType.INSTANT_TIME;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN_OR_EQUALS;
 import static 
org.apache.hudi.hadoop.CachingPath.getPathWithoutSchemeAndAuthority;
+import static 
org.apache.hudi.metadata.HoodieTableMetadata.getMetadataTableBasePath;
 
 /**
  * A validator with spark-submit to compare information, such as partitions, 
file listing, index, etc.,
  * between metadata table and filesystem.
  * <p>
- * There are five validation tasks, that can be enabled independently through 
the following CLI options:
+ * There are seven validation tasks, that can be enabled independently through 
the following CLI options:
  * - `--validate-latest-file-slices`: validate the latest file slices for all 
partitions.
  * - `--validate-latest-base-files`: validate the latest base files for all 
partitions.
  * - `--validate-all-file-groups`: validate all file groups, and all file 
slices within file groups.
  * - `--validate-all-column-stats`: validate column stats for all columns in 
the schema
  * - `--validate-bloom-filters`: validate bloom filters of base files
+ * - `--validate-record-index-count`: validate the number of entries in the 
record index, which
+ * should be equal to the number of record keys in the latest snapshot of the 
table.
+ * - `--validate-record-index-content`: validate the content of the record 
index so that each
+ * record key should have the correct location, and there is no additional or 
missing entry.
  * <p>
  * If the Hudi table is on the local file system, the base path passed to 
`--base-path` must have
  * "file:" prefix to avoid validation failure.
@@ -194,6 +208,12 @@ public class HoodieMetadataTableValidator implements 
Serializable {
     if (cfg.validateBloomFilters) {
       labelList.add("validate-bloom-filters");
     }
+    if (cfg.validateRecordIndexCount) {
+      labelList.add("validate-record-index-count");
+    }
+    if (cfg.validateRecordIndexContent) {
+      labelList.add("validate-record-index-content");
+    }
     return String.join(",", labelList);
   }
 
@@ -235,6 +255,23 @@ public class HoodieMetadataTableValidator implements 
Serializable {
     @Parameter(names = {"--validate-bloom-filters"}, description = "Validate 
bloom filters of base files", required = false)
     public boolean validateBloomFilters = false;
 
+    @Parameter(names = {"--validate-record-index-count"},
+        description = "Validate the number of entries in the record index, 
which should be equal "
+            + "to the number of record keys in the latest snapshot of the 
table",
+        required = false)
+    public boolean validateRecordIndexCount = false;
+
+    @Parameter(names = {"--validate-record-index-content"},
+        description = "Validate the content of the record index so that each 
record key should "
+            + "have the correct location, and there is no additional or 
missing entry",
+        required = false)
+    public boolean validateRecordIndexContent = false;
+
+    @Parameter(names = {"--num-record-index-error-samples"},
+        description = "Number of error samples to show for record index 
validation",
+        required = false)
+    public int numRecordIndexErrorSamples = 100;
+
     @Parameter(names = {"--min-validate-interval-seconds"},
         description = "the min validate interval of each validate when set 
--continuous, default is 10 minutes.")
     public Integer minValidateIntervalSeconds = 10 * 60;
@@ -242,6 +279,9 @@ public class HoodieMetadataTableValidator implements 
Serializable {
     @Parameter(names = {"--parallelism", "-pl"}, description = "Parallelism 
for valuation", required = false)
     public int parallelism = 200;
 
+    @Parameter(names = {"--record-index-parallelism", "-rpl"}, description = 
"Parallelism for validating record index", required = false)
+    public int recordIndexParallelism = 100;
+
     @Parameter(names = {"--ignore-failed", "-ig"}, description = "Ignore 
metadata validate failure and continue.", required = false)
     public boolean ignoreFailed = false;
 
@@ -276,11 +316,15 @@ public class HoodieMetadataTableValidator implements 
Serializable {
           + "   --validate-all-file-groups " + validateAllFileGroups + ", \n"
           + "   --validate-all-column-stats " + validateAllColumnStats + ", \n"
           + "   --validate-bloom-filters " + validateBloomFilters + ", \n"
+          + "   --validate-record-index-count " + validateRecordIndexCount + 
", \n"
+          + "   --validate-record-index-content " + validateRecordIndexContent 
+ ", \n"
+          + "   --num-record-index-error-samples " + 
numRecordIndexErrorSamples + ", \n"
           + "   --continuous " + continuous + ", \n"
           + "   --skip-data-files-for-cleaning " + skipDataFilesForCleaning + 
", \n"
           + "   --ignore-failed " + ignoreFailed + ", \n"
           + "   --min-validate-interval-seconds " + minValidateIntervalSeconds 
+ ", \n"
           + "   --parallelism " + parallelism + ", \n"
+          + "   --record-index-parallelism " + recordIndexParallelism + ", \n"
           + "   --spark-master " + sparkMaster + ", \n"
           + "   --spark-memory " + sparkMemory + ", \n"
           + "   --assumeDatePartitioning-memory " + assumeDatePartitioning + 
", \n"
@@ -306,8 +350,12 @@ public class HoodieMetadataTableValidator implements 
Serializable {
           && Objects.equals(validateAllFileGroups, 
config.validateAllFileGroups)
           && Objects.equals(validateAllColumnStats, 
config.validateAllColumnStats)
           && Objects.equals(validateBloomFilters, config.validateBloomFilters)
+          && Objects.equals(validateRecordIndexCount, 
config.validateRecordIndexCount)
+          && Objects.equals(validateRecordIndexContent, 
config.validateRecordIndexContent)
+          && Objects.equals(numRecordIndexErrorSamples, 
config.numRecordIndexErrorSamples)
           && Objects.equals(minValidateIntervalSeconds, 
config.minValidateIntervalSeconds)
           && Objects.equals(parallelism, config.parallelism)
+          && Objects.equals(recordIndexParallelism, 
config.recordIndexParallelism)
           && Objects.equals(ignoreFailed, config.ignoreFailed)
           && Objects.equals(sparkMaster, config.sparkMaster)
           && Objects.equals(sparkMemory, config.sparkMemory)
@@ -318,9 +366,11 @@ public class HoodieMetadataTableValidator implements 
Serializable {
 
     @Override
     public int hashCode() {
-      return Objects.hash(basePath, continuous, skipDataFilesForCleaning, 
validateLatestFileSlices, validateLatestBaseFiles,
-          validateAllFileGroups, validateAllColumnStats, validateBloomFilters, 
minValidateIntervalSeconds,
-          parallelism, ignoreFailed, sparkMaster, sparkMemory, 
assumeDatePartitioning, propsFilePath, configs, help);
+      return Objects.hash(basePath, continuous, skipDataFilesForCleaning, 
validateLatestFileSlices,
+          validateLatestBaseFiles, validateAllFileGroups, 
validateAllColumnStats, validateBloomFilters,
+          validateRecordIndexCount, validateRecordIndexContent, 
numRecordIndexErrorSamples,
+          minValidateIntervalSeconds, parallelism, recordIndexParallelism, 
ignoreFailed,
+          sparkMaster, sparkMemory, assumeDatePartitioning, propsFilePath, 
configs, help);
     }
   }
 
@@ -444,21 +494,34 @@ public class HoodieMetadataTableValidator implements 
Serializable {
          HoodieMetadataValidationContext fsBasedContext =
              new HoodieMetadataValidationContext(engineContext, cfg, 
metaClient, false)) {
       Set<String> finalBaseFilesForCleaning = baseFilesForCleaning;
-      List<Pair<Boolean, String>> result = 
engineContext.parallelize(allPartitions, 
allPartitions.size()).map(partitionPath -> {
-        try {
-          validateFilesInPartition(metadataTableBasedContext, fsBasedContext, 
partitionPath, finalBaseFilesForCleaning);
-          LOG.info(String.format("Metadata table validation succeeded for 
partition %s (partition %s)", partitionPath, taskLabels));
-          return Pair.of(true, "");
-        } catch (HoodieValidationException e) {
-          LOG.error(
-              String.format("Metadata table validation failed for partition %s 
due to HoodieValidationException (partition %s)",
-                  partitionPath, taskLabels), e);
-          if (!cfg.ignoreFailed) {
-            throw e;
-          }
-          return Pair.of(false, e.getMessage() + " for partition: " + 
partitionPath);
+      List<Pair<Boolean, String>> result = new ArrayList<>(
+          engineContext.parallelize(allPartitions, 
allPartitions.size()).map(partitionPath -> {
+            try {
+              validateFilesInPartition(metadataTableBasedContext, 
fsBasedContext, partitionPath, finalBaseFilesForCleaning);
+              LOG.info(String.format("Metadata table validation succeeded for 
partition %s (partition %s)", partitionPath, taskLabels));
+              return Pair.of(true, "");
+            } catch (HoodieValidationException e) {
+              LOG.error(
+                  String.format("Metadata table validation failed for 
partition %s due to HoodieValidationException (partition %s)",
+                      partitionPath, taskLabels), e);
+              if (!cfg.ignoreFailed) {
+                throw e;
+              }
+              return Pair.of(false, e.getMessage() + " for partition: " + 
partitionPath);
+            }
+          }).collectAsList());
+
+      try {
+        validateRecordIndex(engineContext, metaClient, 
metadataTableBasedContext.getTableMetadata());
+        result.add(Pair.of(true, ""));
+      } catch (HoodieValidationException e) {
+        LOG.error(
+            "Metadata table validation failed due to HoodieValidationException 
in record index validation", e);
+        if (!cfg.ignoreFailed) {
+          throw e;
         }
-      }).collectAsList();
+        result.add(Pair.of(false, e.getMessage()));
+      }
 
       for (Pair<Boolean, String> res : result) {
         finalResult &= res.getKey();
@@ -741,6 +804,174 @@ public class HoodieMetadataTableValidator implements 
Serializable {
     validate(metadataBasedBloomFilters, fsBasedBloomFilters, partitionPath, 
"bloom filters");
   }
 
+  private void validateRecordIndex(HoodieSparkEngineContext sparkEngineContext,
+                                   HoodieTableMetaClient metaClient,
+                                   HoodieTableMetadata tableMetadata) {
+    if (cfg.validateRecordIndexContent) {
+      validateRecordIndexContent(sparkEngineContext, metaClient, 
tableMetadata);
+    } else if (cfg.validateRecordIndexCount) {
+      validateRecordIndexCount(sparkEngineContext, metaClient);
+    }
+  }
+
+  private void validateRecordIndexCount(HoodieSparkEngineContext 
sparkEngineContext,
+                                        HoodieTableMetaClient metaClient) {
+    String basePath = metaClient.getBasePathV2().toString();
+    long countKeyFromTable = 
sparkEngineContext.getSqlContext().read().format("hudi")
+        .load(basePath)
+        .select(RECORD_KEY_METADATA_FIELD)
+        .count();
+    long countKeyFromRecordIndex = 
sparkEngineContext.getSqlContext().read().format("hudi")
+        .load(getMetadataTableBasePath(basePath))
+        .select("key")
+        .filter("type = 5")
+        .count();
+
+    if (countKeyFromTable != countKeyFromRecordIndex) {
+      String message = String.format("Validation of record index count failed: 
"
+              + "%s entries from record index metadata, %s keys from the data 
table.",
+          countKeyFromRecordIndex, countKeyFromTable);
+      LOG.error(message);
+      throw new HoodieValidationException(message);
+    } else {
+      LOG.info(String.format(
+          "Validation of record index count succeeded: %s entries.", 
countKeyFromRecordIndex));
+    }
+  }
+
+  private void validateRecordIndexContent(HoodieSparkEngineContext 
sparkEngineContext,
+                                          HoodieTableMetaClient metaClient,
+                                          HoodieTableMetadata tableMetadata) {
+    String basePath = metaClient.getBasePathV2().toString();
+    JavaPairRDD<String, Pair<String, String>> keyToLocationOnFsRdd =
+        sparkEngineContext.getSqlContext().read().format("hudi").load(basePath)
+            .select(RECORD_KEY_METADATA_FIELD, PARTITION_PATH_METADATA_FIELD, 
FILENAME_METADATA_FIELD)
+            .toJavaRDD()
+            .mapToPair(row -> new 
Tuple2<>(row.getString(row.fieldIndex(RECORD_KEY_METADATA_FIELD)),
+                
Pair.of(row.getString(row.fieldIndex(PARTITION_PATH_METADATA_FIELD)),
+                    
FSUtils.getFileId(row.getString(row.fieldIndex(FILENAME_METADATA_FIELD))))))
+            .cache();
+
+    JavaPairRDD<String, Pair<String, String>> keyToLocationFromRecordIndexRdd =
+        sparkEngineContext.getSqlContext().read().format("hudi")
+            .load(getMetadataTableBasePath(basePath))
+            .filter("type = 5")
+            .select(functions.col("key"),
+                
functions.col("recordIndexMetadata.partitionName").as("partitionName"),
+                
functions.col("recordIndexMetadata.fileIdHighBits").as("fileIdHighBits"),
+                
functions.col("recordIndexMetadata.fileIdLowBits").as("fileIdLowBits"),
+                functions.col("recordIndexMetadata.fileIndex").as("fileIndex"),
+                functions.col("recordIndexMetadata.fileId").as("fileId"),
+                
functions.col("recordIndexMetadata.instantTime").as("instantTime"),
+                
functions.col("recordIndexMetadata.fileIdEncoding").as("fileIdEncoding"))
+            .toJavaRDD()
+            .mapToPair(row -> {
+              HoodieRecordGlobalLocation location = 
HoodieTableMetadataUtil.getLocationFromRecordIndexInfo(
+                  row.getString(row.fieldIndex("partitionName")),
+                  row.getInt(row.fieldIndex("fileIdEncoding")),
+                  row.getLong(row.fieldIndex("fileIdHighBits")),
+                  row.getLong(row.fieldIndex("fileIdLowBits")),
+                  row.getInt(row.fieldIndex("fileIndex")),
+                  row.getString(row.fieldIndex("fileId")),
+                  row.getLong(row.fieldIndex("instantTime")));
+              return new Tuple2<>(row.getString(row.fieldIndex("key")),
+                  Pair.of(location.getPartitionPath(), location.getFileId()));
+            });
+
+    int numErrorSamples = cfg.numRecordIndexErrorSamples;
+    Pair<Long, List<String>> result = 
keyToLocationOnFsRdd.fullOuterJoin(keyToLocationFromRecordIndexRdd, 
cfg.recordIndexParallelism)
+        .map(e -> {
+          Optional<Pair<String, String>> locationOnFs = e._2._1;
+          Optional<Pair<String, String>> locationFromRecordIndex = e._2._2;
+          StringBuilder sb = new StringBuilder();
+          List<String> errorSampleList = new ArrayList<>();
+          if (locationOnFs.isPresent() && locationFromRecordIndex.isPresent()) 
{
+            if 
(locationOnFs.get().getLeft().equals(locationFromRecordIndex.get().getLeft())
+                && 
locationOnFs.get().getRight().equals(locationFromRecordIndex.get().getRight())) 
{
+              return Pair.of(0L, errorSampleList);
+            }
+            errorSampleList.add(constructLocationInfoString(locationOnFs, 
locationFromRecordIndex));
+            return Pair.of(1L, errorSampleList);
+          }
+          if (!locationOnFs.isPresent() && 
!locationFromRecordIndex.isPresent()) {
+            return Pair.of(0L, errorSampleList);
+          }
+          errorSampleList.add(constructLocationInfoString(locationOnFs, 
locationFromRecordIndex));
+          return Pair.of(1L, errorSampleList);
+        })
+        .reduce((pair1, pair2) -> {
+          long errorCount = pair1.getLeft() + pair2.getLeft();
+          List<String> list1 = pair1.getRight();
+          List<String> list2 = pair2.getRight();
+          if (!list1.isEmpty() && !list2.isEmpty()) {
+            if (list1.size() >= numErrorSamples) {
+              return Pair.of(errorCount, list1);
+            }
+            if (list2.size() >= numErrorSamples) {
+              return Pair.of(errorCount, list2);
+            }
+
+            List<String> resultList = new ArrayList<>();
+            if (list1.size() > list2.size()) {
+              resultList.addAll(list1);
+              for (String item : list2) {
+                resultList.add(item);
+                if (resultList.size() >= numErrorSamples) {
+                  break;
+                }
+              }
+            } else {
+              resultList.addAll(list2);
+              for (String item : list1) {
+                resultList.add(item);
+                if (resultList.size() >= numErrorSamples) {
+                  break;
+                }
+              }
+            }
+            return Pair.of(errorCount, resultList);
+          } else if (!list1.isEmpty()) {
+            return Pair.of(errorCount, list1);
+          } else {
+            return Pair.of(errorCount, list2);
+          }
+        });
+
+    long countKey = keyToLocationOnFsRdd.count();
+    keyToLocationOnFsRdd.unpersist();
+
+    long diffCount = result.getLeft();
+    if (diffCount > 0) {
+      String message = String.format("Validation of record index content 
failed: "
+              + "%s keys (total %s) from the data table have wrong location in 
record index "
+              + "metadata. Sample mismatches: %s",
+          diffCount, countKey, String.join(";", result.getRight()));
+      LOG.error(message);
+      throw new HoodieValidationException(message);
+    } else {
+      LOG.info(String.format(
+          "Validation of record index content succeeded: %s entries.", 
countKey));
+    }
+  }
+
+  private String constructLocationInfoString(Optional<Pair<String, String>> 
locationOnFs,
+                                             Optional<Pair<String, String>> 
locationFromRecordIndex) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("FS: ");
+    if (locationOnFs.isPresent()) {
+      sb.append(locationOnFs.get());
+    } else {
+      sb.append("<empty>");
+    }
+    sb.append(", Record Index: ");
+    if (locationFromRecordIndex.isPresent()) {
+      sb.append(locationFromRecordIndex.get());
+    } else {
+      sb.append("<empty>");
+    }
+    return sb.toString();
+  }
+
   private List<String> getLatestBaseFileNames(HoodieMetadataValidationContext 
fsBasedContext, String partitionPath, Set<String> baseDataFilesForCleaning) {
     List<String> latestBaseFilenameList;
     if (!baseDataFilesForCleaning.isEmpty()) {
@@ -1050,6 +1281,7 @@ public class HoodieMetadataTableValidator implements 
Serializable {
           .enable(enableMetadataTable)
           .withMetadataIndexBloomFilter(enableMetadataTable)
           .withMetadataIndexColumnStats(enableMetadataTable)
+          .withEnableRecordIndex(enableMetadataTable)
           .withAssumeDatePartitioning(cfg.assumeDatePartitioning)
           .build();
       this.fileSystemView = 
FileSystemViewManager.createInMemoryFileSystemView(engineContext,
@@ -1064,6 +1296,10 @@ public class HoodieMetadataTableValidator implements 
Serializable {
       return metaClient;
     }
 
+    public HoodieTableMetadata getTableMetadata() {
+      return tableMetadata;
+    }
+
     public List<HoodieBaseFile> getSortedLatestBaseFileList(String 
partitionPath) {
       return fileSystemView.getLatestBaseFiles(partitionPath)
           .sorted(new HoodieBaseFileComparator()).collect(Collectors.toList());

Reply via email to