prashantwason commented on code in PR #18029:
URL: https://github.com/apache/hudi/pull/18029#discussion_r2756385848


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -833,6 +842,29 @@ private int 
estimateFileGroupCount(HoodieData<HoodieRecord> records) {
     );
   }
 
+  /**
+   * Validates the record index after bootstrap by comparing the expected 
record count with the actual
+   * record count stored in the metadata table.
+   *
+   * @param recordCount the expected number of records
+   * @param fileGroupCount the expected number of file groups
+   */
+  private void validateRecordIndex(long recordCount, int fileGroupCount) {
+    String partitionName = 
MetadataPartitionType.RECORD_INDEX.getPartitionPath();
+    HoodieTableFileSystemView fsView = 
HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient);
+    List<FileSlice> fileSlices = 
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, 
Option.of(fsView), partitionName);

Review Comment:
   Thanks for catching this. Updated to use 
`getPartitionLatestMergedFileSlices()` which internally calls 
`getLatestMergedFileSlicesBeforeOrOn()` via `getPartitionFileSlices(metaClient, 
fsView, partition, true)`. This ensures we handle cases with pending 
compactions correctly.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -738,18 +738,26 @@ private void 
initializeFilegroupsAndCommitToRecordIndexPartition(String commitTi
                                                                    
Lazy<List<Pair<String, FileSlice>>> lazyLatestMergedPartitionFileSliceList, 
boolean isPartitionedRLI) throws IOException {
     createRecordIndexDefinition(dataMetaClient, 
Collections.singletonMap(HoodieRecordIndex.IS_PARTITIONED_OPTION, 
String.valueOf(isPartitionedRLI)));
     HoodieData<HoodieRecord> recordIndexRecords;
+    int fileGroupCount;
     if (isPartitionedRLI) {
-      recordIndexRecords = 
initializeFilegroupsAndCommitToPartitionedRecordIndexPartition(commitTimeForPartition,
 lazyLatestMergedPartitionFileSliceList);
+      Pair<Integer, HoodieData<HoodieRecord>> fgCountAndRecords = 
initializeFilegroupsAndCommitToPartitionedRecordIndexPartition(commitTimeForPartition,
 lazyLatestMergedPartitionFileSliceList);
+      fileGroupCount = fgCountAndRecords.getKey();
+      recordIndexRecords = fgCountAndRecords.getValue();
     } else {
       Pair<Integer, HoodieData<HoodieRecord>> fgCountAndRecordIndexRecords = 
initializeRecordIndexPartition(lazyLatestMergedPartitionFileSliceList.get(),
           dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());
+      fileGroupCount = fgCountAndRecordIndexRecords.getKey();
       recordIndexRecords = fgCountAndRecordIndexRecords.getRight();
       initializeFilegroupsAndCommit(RECORD_INDEX, 
RECORD_INDEX.getPartitionPath(), fgCountAndRecordIndexRecords, 
commitTimeForPartition);
     }
+    // Validate record index after commit if duplicates are not allowed
+    if (!dataWriteConfig.allowDuplicatesWithHfileWrites()) {
+      validateRecordIndex(recordIndexRecords.count(), fileGroupCount);

Review Comment:
   Good point about the DAG re-triggering. I've added a new config 
`hoodie.metadata.record.index.bootstrap.validation.enable` (disabled by 
default) to explicitly control validation. Users who want this extra safety 
check can opt-in by enabling the config. This also avoids the overhead for the 
common case where validation isn't needed.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -833,6 +842,29 @@ private int 
estimateFileGroupCount(HoodieData<HoodieRecord> records) {
     );
   }
 
+  /**
+   * Validates the record index after bootstrap by comparing the expected 
record count with the actual
+   * record count stored in the metadata table.
+   *
+   * @param recordCount the expected number of records
+   * @param fileGroupCount the expected number of file groups
+   */
+  private void validateRecordIndex(long recordCount, int fileGroupCount) {
+    String partitionName = 
MetadataPartitionType.RECORD_INDEX.getPartitionPath();
+    HoodieTableFileSystemView fsView = 
HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient);
+    List<FileSlice> fileSlices = 
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, 
Option.of(fsView), partitionName);
+    long totalRecords = 0L;
+    for (FileSlice fileSlice : fileSlices) {

Review Comment:
   Agreed. I've refactored the validation to run in a distributed manner using 
`engineContext.parallelize()`. The base file paths are now distributed across 
executors, and each partition reads the record count from HFiles in parallel. 
The counts are then aggregated. This should scale well even with 10k+ file 
groups.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to