nsivabalan commented on code in PR #18029:
URL: https://github.com/apache/hudi/pull/18029#discussion_r2748124026


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -738,18 +738,26 @@ private void 
initializeFilegroupsAndCommitToRecordIndexPartition(String commitTi
                                                                    
Lazy<List<Pair<String, FileSlice>>> lazyLatestMergedPartitionFileSliceList, 
boolean isPartitionedRLI) throws IOException {
     createRecordIndexDefinition(dataMetaClient, 
Collections.singletonMap(HoodieRecordIndex.IS_PARTITIONED_OPTION, 
String.valueOf(isPartitionedRLI)));
     HoodieData<HoodieRecord> recordIndexRecords;
+    int fileGroupCount;
     if (isPartitionedRLI) {
-      recordIndexRecords = 
initializeFilegroupsAndCommitToPartitionedRecordIndexPartition(commitTimeForPartition,
 lazyLatestMergedPartitionFileSliceList);
+      Pair<Integer, HoodieData<HoodieRecord>> fgCountAndRecords = 
initializeFilegroupsAndCommitToPartitionedRecordIndexPartition(commitTimeForPartition,
 lazyLatestMergedPartitionFileSliceList);
+      fileGroupCount = fgCountAndRecords.getKey();
+      recordIndexRecords = fgCountAndRecords.getValue();
     } else {
       Pair<Integer, HoodieData<HoodieRecord>> fgCountAndRecordIndexRecords = 
initializeRecordIndexPartition(lazyLatestMergedPartitionFileSliceList.get(),
           dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());
+      fileGroupCount = fgCountAndRecordIndexRecords.getKey();
       recordIndexRecords = fgCountAndRecordIndexRecords.getRight();
       initializeFilegroupsAndCommit(RECORD_INDEX, 
RECORD_INDEX.getPartitionPath(), fgCountAndRecordIndexRecords, 
commitTimeForPartition);
     }
+    // Validate record index after commit if duplicates are not allowed
+    if (!dataWriteConfig.allowDuplicatesWithHfileWrites()) {
+      validateRecordIndex(recordIndexRecords.count(), fileGroupCount);

Review Comment:
   If you take a look at the dag, we are not persisting `recordIndexRecords` in 
some cases. so, this will trigger the dag again. 
   
   I would recommend to add a flag and do validation only if enabled. 
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -833,6 +842,29 @@ private int 
estimateFileGroupCount(HoodieData<HoodieRecord> records) {
     );
   }
 
+  /**
+   * Validates the record index after bootstrap by comparing the expected 
record count with the actual
+   * record count stored in the metadata table.
+   *
+   * @param recordCount the expected number of records
+   * @param fileGroupCount the expected number of file groups
+   */
+  private void validateRecordIndex(long recordCount, int fileGroupCount) {
+    String partitionName = 
MetadataPartitionType.RECORD_INDEX.getPartitionPath();
+    HoodieTableFileSystemView fsView = 
HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient);
+    List<FileSlice> fileSlices = 
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, 
Option.of(fsView), partitionName);

Review Comment:
   we should ensure under the hood, this calls 
getLatestMergedFileSlicesBeforeOrOn() in fsv 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -833,6 +842,29 @@ private int 
estimateFileGroupCount(HoodieData<HoodieRecord> records) {
     );
   }
 
+  /**
+   * Validates the record index after bootstrap by comparing the expected 
record count with the actual
+   * record count stored in the metadata table.
+   *
+   * @param recordCount the expected number of records
+   * @param fileGroupCount the expected number of file groups
+   */
+  private void validateRecordIndex(long recordCount, int fileGroupCount) {
+    String partitionName = 
MetadataPartitionType.RECORD_INDEX.getPartitionPath();
+    HoodieTableFileSystemView fsView = 
HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient);
+    List<FileSlice> fileSlices = 
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient, 
Option.of(fsView), partitionName);
+    long totalRecords = 0L;
+    for (FileSlice fileSlice : fileSlices) {

Review Comment:
   why can't we do spark distributed manner instead of doing it inline in 
driver? 
   for 10k file groups or more, this is going to lot of time in driver right
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to