nsivabalan commented on code in PR #9437:
URL: https://github.com/apache/hudi/pull/9437#discussion_r1294013795


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java:
##########
@@ -741,6 +791,116 @@ private void validateBloomFilters(
     validate(metadataBasedBloomFilters, fsBasedBloomFilters, partitionPath, 
"bloom filters");
   }
 
+  private void validateRecordIndex(HoodieSparkEngineContext sparkEngineContext,
+                                   HoodieTableMetaClient metaClient,
+                                   HoodieTableMetadata tableMetadata) {
+    if (cfg.validateRecordIndexContent) {
+      validateRecordIndexContent(sparkEngineContext, metaClient, 
tableMetadata);
+    } else if (cfg.validateRecordIndexCount) {
+      validateRecordIndexCount(sparkEngineContext, metaClient);
+    }
+  }
+
+  private void validateRecordIndexCount(HoodieSparkEngineContext 
sparkEngineContext,
+                                        HoodieTableMetaClient metaClient) {
+    String basePath = metaClient.getBasePathV2().toString();
+    long countKeyFromTable = 
sparkEngineContext.getSqlContext().read().format("hudi")
+        .load(basePath)
+        .select(RECORD_KEY_METADATA_FIELD)
+        .distinct()
+        .count();
+    long countKeyFromRecordIndex = 
sparkEngineContext.getSqlContext().read().format("hudi")
+        .load(getMetadataTableBasePath(basePath))
+        .select("key")
+        .filter("type = 5")
+        .distinct()

Review Comment:
   snapshot read by itself should return unique values. if there are dups, its 
a bug.can we remove distinct() here?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java:
##########
@@ -741,6 +791,116 @@ private void validateBloomFilters(
     validate(metadataBasedBloomFilters, fsBasedBloomFilters, partitionPath, 
"bloom filters");
   }
 
+  private void validateRecordIndex(HoodieSparkEngineContext sparkEngineContext,
+                                   HoodieTableMetaClient metaClient,
+                                   HoodieTableMetadata tableMetadata) {
+    if (cfg.validateRecordIndexContent) {
+      validateRecordIndexContent(sparkEngineContext, metaClient, 
tableMetadata);
+    } else if (cfg.validateRecordIndexCount) {
+      validateRecordIndexCount(sparkEngineContext, metaClient);
+    }
+  }
+
+  private void validateRecordIndexCount(HoodieSparkEngineContext 
sparkEngineContext,
+                                        HoodieTableMetaClient metaClient) {
+    String basePath = metaClient.getBasePathV2().toString();
+    long countKeyFromTable = 
sparkEngineContext.getSqlContext().read().format("hudi")
+        .load(basePath)
+        .select(RECORD_KEY_METADATA_FIELD)
+        .distinct()
+        .count();
+    long countKeyFromRecordIndex = 
sparkEngineContext.getSqlContext().read().format("hudi")
+        .load(getMetadataTableBasePath(basePath))
+        .select("key")
+        .filter("type = 5")
+        .distinct()
+        .count();
+
+    if (countKeyFromTable != countKeyFromRecordIndex) {
+      String message = String.format("Validation of record index count failed: 
"
+              + "%s entries from record index metadata, %s keys from the data 
table.",
+          countKeyFromRecordIndex, countKeyFromTable);
+      LOG.error(message);
+      throw new HoodieValidationException(message);
+    } else {
+      LOG.info(String.format(
+          "Validation of record index count succeeded: %s entries.", 
countKeyFromRecordIndex));
+    }
+  }
+
+  private void validateRecordIndexContent(HoodieSparkEngineContext 
sparkEngineContext,
+                                          HoodieTableMetaClient metaClient,
+                                          HoodieTableMetadata tableMetadata) {
+    String basePath = metaClient.getBasePathV2().toString();
+    JavaPairRDD<String, Pair<String, String>> keyToLocationOnFsRdd =
+        sparkEngineContext.getSqlContext().read().format("hudi").load(basePath)
+            .select(RECORD_KEY_METADATA_FIELD, PARTITION_PATH_METADATA_FIELD, 
FILENAME_METADATA_FIELD)
+            .toJavaRDD()
+            .mapToPair(row -> new 
Tuple2<>(row.getString(row.fieldIndex(RECORD_KEY_METADATA_FIELD)),
+                
Pair.of(row.getString(row.fieldIndex(PARTITION_PATH_METADATA_FIELD)),
+                    
FSUtils.getFileId(row.getString(row.fieldIndex(FILENAME_METADATA_FIELD))))))
+            .cache();
+
+    JavaPairRDD<String, Pair<String, String>> keyToLocationFromRecordIndexRdd =
+        sparkEngineContext.getSqlContext().read().format("hudi")
+            .load(getMetadataTableBasePath(basePath))
+            .filter("type = 5")
+            .select(functions.col("key"),
+                
functions.col("recordIndexMetadata.partitionName").as("partitionName"),
+                
functions.col("recordIndexMetadata.fileIdHighBits").as("fileIdHighBits"),
+                
functions.col("recordIndexMetadata.fileIdLowBits").as("fileIdLowBits"),
+                functions.col("recordIndexMetadata.fileIndex").as("fileIndex"),
+                functions.col("recordIndexMetadata.fileId").as("fileId"),
+                
functions.col("recordIndexMetadata.instantTime").as("instantTime"),
+                
functions.col("recordIndexMetadata.fileIdEncoding").as("fileIdEncoding"))
+            .toJavaRDD()
+            .mapToPair(row -> {
+              HoodieRecordGlobalLocation location = 
HoodieTableMetadataUtil.getLocationFromRecordIndexInfo(
+                  row.getString(row.fieldIndex("partitionName")),
+                  row.getInt(row.fieldIndex("fileIdEncoding")),
+                  row.getLong(row.fieldIndex("fileIdHighBits")),
+                  row.getLong(row.fieldIndex("fileIdLowBits")),
+                  row.getInt(row.fieldIndex("fileIndex")),
+                  row.getString(row.fieldIndex("fileId")),
+                  row.getLong(row.fieldIndex("instantTime")));
+              return new Tuple2<>(row.getString(row.fieldIndex("key")),
+                  Pair.of(location.getPartitionPath(), location.getFileId()));
+            });
+
+    long diffCount = 
keyToLocationOnFsRdd.fullOuterJoin(keyToLocationFromRecordIndexRdd, 
cfg.recordIndexParallelism)
+        .map(e -> {
+          Optional<Pair<String, String>> locationOnFs = e._2._1;
+          Optional<Pair<String, String>> locationFromRecordIndex = e._2._2;
+          if (locationOnFs.isPresent() && locationFromRecordIndex.isPresent()) 
{
+            if 
(locationOnFs.get().getLeft().equals(locationFromRecordIndex.get().getLeft())
+                && 
locationOnFs.get().getRight().equals(locationFromRecordIndex.get().getRight())) 
{
+              return 0L;
+            }
+            return 1L;

Review Comment:
   can we also collect mis-matching records and print it out in the end. may be 
100 mis-matching records would do. 
   will help in investigation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to