yihua commented on code in PR #9437:
URL: https://github.com/apache/hudi/pull/9437#discussion_r1294049901
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java:
##########
@@ -741,6 +791,116 @@ private void validateBloomFilters(
validate(metadataBasedBloomFilters, fsBasedBloomFilters, partitionPath,
"bloom filters");
}
+ private void validateRecordIndex(HoodieSparkEngineContext sparkEngineContext,
+ HoodieTableMetaClient metaClient,
+ HoodieTableMetadata tableMetadata) {
+ if (cfg.validateRecordIndexContent) {
+ validateRecordIndexContent(sparkEngineContext, metaClient,
tableMetadata);
+ } else if (cfg.validateRecordIndexCount) {
+ validateRecordIndexCount(sparkEngineContext, metaClient);
+ }
+ }
+
+ private void validateRecordIndexCount(HoodieSparkEngineContext
sparkEngineContext,
+ HoodieTableMetaClient metaClient) {
+ String basePath = metaClient.getBasePathV2().toString();
+ long countKeyFromTable =
sparkEngineContext.getSqlContext().read().format("hudi")
+ .load(basePath)
+ .select(RECORD_KEY_METADATA_FIELD)
+ .distinct()
+ .count();
+ long countKeyFromRecordIndex =
sparkEngineContext.getSqlContext().read().format("hudi")
+ .load(getMetadataTableBasePath(basePath))
+ .select("key")
+ .filter("type = 5")
+ .distinct()
+ .count();
+
+ if (countKeyFromTable != countKeyFromRecordIndex) {
+ String message = String.format("Validation of record index count failed:
"
+ + "%s entries from record index metadata, %s keys from the data
table.",
+ countKeyFromRecordIndex, countKeyFromTable);
+ LOG.error(message);
+ throw new HoodieValidationException(message);
+ } else {
+ LOG.info(String.format(
+ "Validation of record index count succeeded: %s entries.",
countKeyFromRecordIndex));
+ }
+ }
+
+ private void validateRecordIndexContent(HoodieSparkEngineContext
sparkEngineContext,
+ HoodieTableMetaClient metaClient,
+ HoodieTableMetadata tableMetadata) {
+ String basePath = metaClient.getBasePathV2().toString();
+ JavaPairRDD<String, Pair<String, String>> keyToLocationOnFsRdd =
+ sparkEngineContext.getSqlContext().read().format("hudi").load(basePath)
+ .select(RECORD_KEY_METADATA_FIELD, PARTITION_PATH_METADATA_FIELD,
FILENAME_METADATA_FIELD)
+ .toJavaRDD()
+ .mapToPair(row -> new
Tuple2<>(row.getString(row.fieldIndex(RECORD_KEY_METADATA_FIELD)),
+
Pair.of(row.getString(row.fieldIndex(PARTITION_PATH_METADATA_FIELD)),
+
FSUtils.getFileId(row.getString(row.fieldIndex(FILENAME_METADATA_FIELD))))))
+ .cache();
+
+ JavaPairRDD<String, Pair<String, String>> keyToLocationFromRecordIndexRdd =
+ sparkEngineContext.getSqlContext().read().format("hudi")
+ .load(getMetadataTableBasePath(basePath))
+ .filter("type = 5")
+ .select(functions.col("key"),
+
functions.col("recordIndexMetadata.partitionName").as("partitionName"),
+
functions.col("recordIndexMetadata.fileIdHighBits").as("fileIdHighBits"),
+
functions.col("recordIndexMetadata.fileIdLowBits").as("fileIdLowBits"),
+ functions.col("recordIndexMetadata.fileIndex").as("fileIndex"),
+ functions.col("recordIndexMetadata.fileId").as("fileId"),
+
functions.col("recordIndexMetadata.instantTime").as("instantTime"),
+
functions.col("recordIndexMetadata.fileIdEncoding").as("fileIdEncoding"))
+ .toJavaRDD()
+ .mapToPair(row -> {
+ HoodieRecordGlobalLocation location =
HoodieTableMetadataUtil.getLocationFromRecordIndexInfo(
+ row.getString(row.fieldIndex("partitionName")),
+ row.getInt(row.fieldIndex("fileIdEncoding")),
+ row.getLong(row.fieldIndex("fileIdHighBits")),
+ row.getLong(row.fieldIndex("fileIdLowBits")),
+ row.getInt(row.fieldIndex("fileIndex")),
+ row.getString(row.fieldIndex("fileId")),
+ row.getLong(row.fieldIndex("instantTime")));
+ return new Tuple2<>(row.getString(row.fieldIndex("key")),
+ Pair.of(location.getPartitionPath(), location.getFileId()));
+ });
+
+ long diffCount =
keyToLocationOnFsRdd.fullOuterJoin(keyToLocationFromRecordIndexRdd,
cfg.recordIndexParallelism)
+ .map(e -> {
+ Optional<Pair<String, String>> locationOnFs = e._2._1;
+ Optional<Pair<String, String>> locationFromRecordIndex = e._2._2;
+ if (locationOnFs.isPresent() && locationFromRecordIndex.isPresent())
{
+ if
(locationOnFs.get().getLeft().equals(locationFromRecordIndex.get().getLeft())
+ &&
locationOnFs.get().getRight().equals(locationFromRecordIndex.get().getRight()))
{
+ return 0L;
+ }
+ return 1L;
Review Comment:
I thought about collecting mis-matching records. That introduces additional
overheads by either retriggering the DAG in Spark (5-10 minutes for a small
table), or requiring more memory and disk spill for caching the intermediate
RDD. I'll add the functionality of collecting sample records, guarded by a
config and turn it off by default.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]