nsivabalan commented on code in PR #18029:
URL: https://github.com/apache/hudi/pull/18029#discussion_r2748124026
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -738,18 +738,26 @@ private void
initializeFilegroupsAndCommitToRecordIndexPartition(String commitTi
Lazy<List<Pair<String, FileSlice>>> lazyLatestMergedPartitionFileSliceList,
boolean isPartitionedRLI) throws IOException {
createRecordIndexDefinition(dataMetaClient,
Collections.singletonMap(HoodieRecordIndex.IS_PARTITIONED_OPTION,
String.valueOf(isPartitionedRLI)));
HoodieData<HoodieRecord> recordIndexRecords;
+ int fileGroupCount;
if (isPartitionedRLI) {
- recordIndexRecords =
initializeFilegroupsAndCommitToPartitionedRecordIndexPartition(commitTimeForPartition,
lazyLatestMergedPartitionFileSliceList);
+ Pair<Integer, HoodieData<HoodieRecord>> fgCountAndRecords =
initializeFilegroupsAndCommitToPartitionedRecordIndexPartition(commitTimeForPartition,
lazyLatestMergedPartitionFileSliceList);
+ fileGroupCount = fgCountAndRecords.getKey();
+ recordIndexRecords = fgCountAndRecords.getValue();
} else {
Pair<Integer, HoodieData<HoodieRecord>> fgCountAndRecordIndexRecords =
initializeRecordIndexPartition(lazyLatestMergedPartitionFileSliceList.get(),
dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());
+ fileGroupCount = fgCountAndRecordIndexRecords.getKey();
recordIndexRecords = fgCountAndRecordIndexRecords.getRight();
initializeFilegroupsAndCommit(RECORD_INDEX,
RECORD_INDEX.getPartitionPath(), fgCountAndRecordIndexRecords,
commitTimeForPartition);
}
+ // Validate record index after commit if duplicates are not allowed
+ if (!dataWriteConfig.allowDuplicatesWithHfileWrites()) {
+ validateRecordIndex(recordIndexRecords.count(), fileGroupCount);
Review Comment:
If you take a look at the dag, we are not persisting `recordIndexRecords` in
some cases. so, this will trigger the dag again.
I would recommend to add a flag and do validation only if enabled.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -833,6 +842,29 @@ private int
estimateFileGroupCount(HoodieData<HoodieRecord> records) {
);
}
+ /**
+ * Validates the record index after bootstrap by comparing the expected
record count with the actual
+ * record count stored in the metadata table.
+ *
+ * @param recordCount the expected number of records
+ * @param fileGroupCount the expected number of file groups
+ */
+ private void validateRecordIndex(long recordCount, int fileGroupCount) {
+ String partitionName =
MetadataPartitionType.RECORD_INDEX.getPartitionPath();
+ HoodieTableFileSystemView fsView =
HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient);
+ List<FileSlice> fileSlices =
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient,
Option.of(fsView), partitionName);
Review Comment:
we should ensure under the hood, this calls
getLatestMergedFileSlicesBeforeOrOn() in fsv
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -833,6 +842,29 @@ private int
estimateFileGroupCount(HoodieData<HoodieRecord> records) {
);
}
+ /**
+ * Validates the record index after bootstrap by comparing the expected
record count with the actual
+ * record count stored in the metadata table.
+ *
+ * @param recordCount the expected number of records
+ * @param fileGroupCount the expected number of file groups
+ */
+ private void validateRecordIndex(long recordCount, int fileGroupCount) {
+ String partitionName =
MetadataPartitionType.RECORD_INDEX.getPartitionPath();
+ HoodieTableFileSystemView fsView =
HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient);
+ List<FileSlice> fileSlices =
HoodieTableMetadataUtil.getPartitionLatestFileSlices(metadataMetaClient,
Option.of(fsView), partitionName);
+ long totalRecords = 0L;
+ for (FileSlice fileSlice : fileSlices) {
Review Comment:
why can't we do spark distributed manner instead of doing it inline in
driver?
for 10k file groups or more, this is going to lot of time in driver right
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]