prashantwason commented on code in PR #18029:
URL: https://github.com/apache/hudi/pull/18029#discussion_r2756385848
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -833,6 +842,29 @@ private int
estimateFileGroupCount(HoodieData<HoodieRecord> records) {
);
}
+ /**
+ * Validates the record index after bootstrap by comparing the expected
record count with the actual
+ * record count stored in the metadata table.
+ *
+ * @param recordCount the expected number of records
+ * @param fileGroupCount the expected number of file groups
+ */
+ private void validateRecordIndex(long recordCount, int fileGroupCount) {
+ String partitionName =
MetadataPartitionType.RECORD_INDEX.getPartitionPath();
+ HoodieTableFileSystemView fsView =
HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient);
+ List<FileSlice> fileSlices =
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient,
Option.of(fsView), partitionName);
Review Comment:
Thanks for catching this. Updated to use
`getPartitionLatestMergedFileSlices()` which internally calls
`getLatestMergedFileSlicesBeforeOrOn()` via `getPartitionFileSlices(metaClient,
fsView, partition, true)`. This ensures we handle cases with pending
compactions correctly.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -738,18 +738,26 @@ private void
initializeFilegroupsAndCommitToRecordIndexPartition(String commitTi
Lazy<List<Pair<String, FileSlice>>> lazyLatestMergedPartitionFileSliceList,
boolean isPartitionedRLI) throws IOException {
createRecordIndexDefinition(dataMetaClient,
Collections.singletonMap(HoodieRecordIndex.IS_PARTITIONED_OPTION,
String.valueOf(isPartitionedRLI)));
HoodieData<HoodieRecord> recordIndexRecords;
+ int fileGroupCount;
if (isPartitionedRLI) {
- recordIndexRecords =
initializeFilegroupsAndCommitToPartitionedRecordIndexPartition(commitTimeForPartition,
lazyLatestMergedPartitionFileSliceList);
+ Pair<Integer, HoodieData<HoodieRecord>> fgCountAndRecords =
initializeFilegroupsAndCommitToPartitionedRecordIndexPartition(commitTimeForPartition,
lazyLatestMergedPartitionFileSliceList);
+ fileGroupCount = fgCountAndRecords.getKey();
+ recordIndexRecords = fgCountAndRecords.getValue();
} else {
Pair<Integer, HoodieData<HoodieRecord>> fgCountAndRecordIndexRecords =
initializeRecordIndexPartition(lazyLatestMergedPartitionFileSliceList.get(),
dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());
+ fileGroupCount = fgCountAndRecordIndexRecords.getKey();
recordIndexRecords = fgCountAndRecordIndexRecords.getRight();
initializeFilegroupsAndCommit(RECORD_INDEX,
RECORD_INDEX.getPartitionPath(), fgCountAndRecordIndexRecords,
commitTimeForPartition);
}
+ // Validate record index after commit if duplicates are not allowed
+ if (!dataWriteConfig.allowDuplicatesWithHfileWrites()) {
+ validateRecordIndex(recordIndexRecords.count(), fileGroupCount);
Review Comment:
Good point about the DAG re-triggering. I've added a new config
`hoodie.metadata.record.index.bootstrap.validation.enable` (disabled by
default) to explicitly control validation. Users who want this extra safety
check can opt-in by enabling the config. This also avoids the overhead for the
common case where validation isn't needed.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -833,6 +842,29 @@ private int
estimateFileGroupCount(HoodieData<HoodieRecord> records) {
);
}
+ /**
+ * Validates the record index after bootstrap by comparing the expected
record count with the actual
+ * record count stored in the metadata table.
+ *
+ * @param recordCount the expected number of records
+ * @param fileGroupCount the expected number of file groups
+ */
+ private void validateRecordIndex(long recordCount, int fileGroupCount) {
+ String partitionName =
MetadataPartitionType.RECORD_INDEX.getPartitionPath();
+ HoodieTableFileSystemView fsView =
HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient);
+ List<FileSlice> fileSlices =
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient,
Option.of(fsView), partitionName);
+ long totalRecords = 0L;
+ for (FileSlice fileSlice : fileSlices) {
Review Comment:
Agreed. I've refactored the validation to run in a distributed manner using
`engineContext.parallelize()`. The base file paths are now distributed across
executors, and each partition reads the record count from HFiles in parallel.
The counts are then aggregated. This should scale well even with 10k+ file
groups.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]