yihua commented on code in PR #9437:
URL: https://github.com/apache/hudi/pull/9437#discussion_r1294125576
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java:
##########
@@ -741,6 +791,116 @@ private void validateBloomFilters(
validate(metadataBasedBloomFilters, fsBasedBloomFilters, partitionPath,
"bloom filters");
}
+ private void validateRecordIndex(HoodieSparkEngineContext sparkEngineContext,
+ HoodieTableMetaClient metaClient,
+ HoodieTableMetadata tableMetadata) {
+ if (cfg.validateRecordIndexContent) {
+ validateRecordIndexContent(sparkEngineContext, metaClient,
tableMetadata);
+ } else if (cfg.validateRecordIndexCount) {
+ validateRecordIndexCount(sparkEngineContext, metaClient);
+ }
+ }
+
+ private void validateRecordIndexCount(HoodieSparkEngineContext
sparkEngineContext,
+ HoodieTableMetaClient metaClient) {
+ String basePath = metaClient.getBasePathV2().toString();
+ long countKeyFromTable =
sparkEngineContext.getSqlContext().read().format("hudi")
+ .load(basePath)
+ .select(RECORD_KEY_METADATA_FIELD)
+ .distinct()
+ .count();
+ long countKeyFromRecordIndex =
sparkEngineContext.getSqlContext().read().format("hudi")
+ .load(getMetadataTableBasePath(basePath))
+ .select("key")
+ .filter("type = 5")
+ .distinct()
+ .count();
+
+ if (countKeyFromTable != countKeyFromRecordIndex) {
+ String message = String.format("Validation of record index count failed:
"
+ + "%s entries from record index metadata, %s keys from the data
table.",
+ countKeyFromRecordIndex, countKeyFromTable);
+ LOG.error(message);
+ throw new HoodieValidationException(message);
+ } else {
+ LOG.info(String.format(
+ "Validation of record index count succeeded: %s entries.",
countKeyFromRecordIndex));
+ }
+ }
+
+ private void validateRecordIndexContent(HoodieSparkEngineContext
sparkEngineContext,
+ HoodieTableMetaClient metaClient,
+ HoodieTableMetadata tableMetadata) {
+ String basePath = metaClient.getBasePathV2().toString();
+ JavaPairRDD<String, Pair<String, String>> keyToLocationOnFsRdd =
+ sparkEngineContext.getSqlContext().read().format("hudi").load(basePath)
+ .select(RECORD_KEY_METADATA_FIELD, PARTITION_PATH_METADATA_FIELD,
FILENAME_METADATA_FIELD)
+ .toJavaRDD()
+ .mapToPair(row -> new
Tuple2<>(row.getString(row.fieldIndex(RECORD_KEY_METADATA_FIELD)),
+
Pair.of(row.getString(row.fieldIndex(PARTITION_PATH_METADATA_FIELD)),
+
FSUtils.getFileId(row.getString(row.fieldIndex(FILENAME_METADATA_FIELD))))))
+ .cache();
+
+ JavaPairRDD<String, Pair<String, String>> keyToLocationFromRecordIndexRdd =
+ sparkEngineContext.getSqlContext().read().format("hudi")
+ .load(getMetadataTableBasePath(basePath))
+ .filter("type = 5")
+ .select(functions.col("key"),
+
functions.col("recordIndexMetadata.partitionName").as("partitionName"),
+
functions.col("recordIndexMetadata.fileIdHighBits").as("fileIdHighBits"),
+
functions.col("recordIndexMetadata.fileIdLowBits").as("fileIdLowBits"),
+ functions.col("recordIndexMetadata.fileIndex").as("fileIndex"),
+ functions.col("recordIndexMetadata.fileId").as("fileId"),
+
functions.col("recordIndexMetadata.instantTime").as("instantTime"),
+
functions.col("recordIndexMetadata.fileIdEncoding").as("fileIdEncoding"))
+ .toJavaRDD()
+ .mapToPair(row -> {
+ HoodieRecordGlobalLocation location =
HoodieTableMetadataUtil.getLocationFromRecordIndexInfo(
+ row.getString(row.fieldIndex("partitionName")),
+ row.getInt(row.fieldIndex("fileIdEncoding")),
+ row.getLong(row.fieldIndex("fileIdHighBits")),
+ row.getLong(row.fieldIndex("fileIdLowBits")),
+ row.getInt(row.fieldIndex("fileIndex")),
+ row.getString(row.fieldIndex("fileId")),
+ row.getLong(row.fieldIndex("instantTime")));
+ return new Tuple2<>(row.getString(row.fieldIndex("key")),
+ Pair.of(location.getPartitionPath(), location.getFileId()));
+ });
+
+ long diffCount =
keyToLocationOnFsRdd.fullOuterJoin(keyToLocationFromRecordIndexRdd,
cfg.recordIndexParallelism)
+ .map(e -> {
+ Optional<Pair<String, String>> locationOnFs = e._2._1;
+ Optional<Pair<String, String>> locationFromRecordIndex = e._2._2;
+ if (locationOnFs.isPresent() && locationFromRecordIndex.isPresent())
{
+ if
(locationOnFs.get().getLeft().equals(locationFromRecordIndex.get().getLeft())
+ &&
locationOnFs.get().getRight().equals(locationFromRecordIndex.get().getRight()))
{
+ return 0L;
+ }
+ return 1L;
Review Comment:
I've found a way to collect sample mismatching entries in the same DAG.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]