This is an automated email from the ASF dual-hosted git repository.
pwason pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new fb7b1a5d9111 feat(metadata-table): Add count validation for record
index bootstrap (#18029)
fb7b1a5d9111 is described below
commit fb7b1a5d9111f41fbcd741edc62f036cc9e344ed
Author: Prashant Wason <[email protected]>
AuthorDate: Wed Feb 25 16:08:02 2026 -0800
feat(metadata-table): Add count validation for record index bootstrap
(#18029)
* Add count validation for record index bootstrap of a table
This commit adds validation for record index bootstrap by comparing
the expected record count with the actual record count stored in the
metadata table. This helps ensure data integrity during the bootstrap
process.
Changes:
- Added validateRecordIndex method in HoodieBackedTableMetadataWriter
to validate record counts after bootstrap
- Added getTotalRecordIndexRecords method in HoodieBackedTableMetadata
to get total records from file slice base files
- Updated initializeFilegroupsAndCommitToRecordIndexPartition to call
validation after commit when duplicates are not allowed
* Address PR review comments for record index validation
- Add config flag hoodie.metadata.record.index.bootstrap.validation.enable
(disabled by default) to explicitly control validation
- Use getPartitionLatestMergedFileSlices() to handle pending compactions
- Make validation distributed using engineContext.parallelize() instead
of iterating in driver
- Remove unused getTotalRecordIndexRecords() from HoodieBackedTableMetadata
Co-Authored-By: Claude Opus 4.5 <[email protected]>
* Address review comments: close FSV, rename config, enable in test
1. Close FSView at the end of validateRecordIndex method using try-finally
2. Renamed config from
'hoodie.metadata.record.index.bootstrap.validation.enable'
to 'hoodie.metadata.record.index.enable.validation.on.initialization'
3. Enabled validation in testRollbackPendingCommitWithRecordIndex test
Co-Authored-By: Claude Opus 4.5 <[email protected]>
---------
Co-authored-by: Claude Opus 4.5 <[email protected]>
---
.../metadata/HoodieBackedTableMetadataWriter.java | 97 +++++++++++++++++++++-
.../hudi/common/config/HoodieMetadataConfig.java | 13 +++
.../hudi/functional/TestHoodieBackedMetadata.java | 2 +
3 files changed, 108 insertions(+), 4 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index a78fd998866e..327cdc1b4b05 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -24,8 +24,10 @@ import org.apache.hudi.avro.model.HoodieIndexPlan;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRestorePlan;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.io.storage.HoodieAvroFileReader;
import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.EngineType;
@@ -79,6 +81,7 @@ import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.index.record.HoodieRecordIndex;
import org.apache.hudi.internal.schema.InternalSchema;
+import org.apache.hudi.io.storage.HoodieIOFactory;
import org.apache.hudi.internal.schema.utils.SerDeHelper;
import org.apache.hudi.metadata.HoodieTableMetadataUtil.DirectoryInfo;
import org.apache.hudi.storage.HoodieStorage;
@@ -740,18 +743,26 @@ public abstract class HoodieBackedTableMetadataWriter<I,
O> implements HoodieTab
Lazy<List<Pair<String, FileSlice>>> lazyLatestMergedPartitionFileSliceList,
boolean isPartitionedRLI) throws IOException {
createRecordIndexDefinition(dataMetaClient,
Collections.singletonMap(HoodieRecordIndex.IS_PARTITIONED_OPTION,
String.valueOf(isPartitionedRLI)));
HoodieData<HoodieRecord> recordIndexRecords;
+ int fileGroupCount;
if (isPartitionedRLI) {
- recordIndexRecords =
initializeFilegroupsAndCommitToPartitionedRecordIndexPartition(commitTimeForPartition,
lazyLatestMergedPartitionFileSliceList);
+ Pair<Integer, HoodieData<HoodieRecord>> fgCountAndRecords =
initializeFilegroupsAndCommitToPartitionedRecordIndexPartition(commitTimeForPartition,
lazyLatestMergedPartitionFileSliceList);
+ fileGroupCount = fgCountAndRecords.getKey();
+ recordIndexRecords = fgCountAndRecords.getValue();
} else {
Pair<Integer, HoodieData<HoodieRecord>> fgCountAndRecordIndexRecords =
initializeRecordIndexPartition(lazyLatestMergedPartitionFileSliceList.get(),
dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());
+ fileGroupCount = fgCountAndRecordIndexRecords.getKey();
recordIndexRecords = fgCountAndRecordIndexRecords.getRight();
initializeFilegroupsAndCommit(RECORD_INDEX,
RECORD_INDEX.getPartitionPath(), fgCountAndRecordIndexRecords,
commitTimeForPartition);
}
+ // Validate record index after commit if validation is enabled
+ if
(dataWriteConfig.getMetadataConfig().isRecordIndexInitializationValidationEnabled())
{
+ validateRecordIndex(recordIndexRecords, fileGroupCount);
+ }
recordIndexRecords.unpersist();
}
- private HoodieData<HoodieRecord>
initializeFilegroupsAndCommitToPartitionedRecordIndexPartition(String
commitTimeForPartition,
+ private Pair<Integer, HoodieData<HoodieRecord>>
initializeFilegroupsAndCommitToPartitionedRecordIndexPartition(String
commitTimeForPartition,
Lazy<List<Pair<String, FileSlice>>> lazyLatestMergedPartitionFileSliceList)
throws IOException {
Map<String, List<Pair<String, FileSlice>>> partitionFileSlicePairsMap =
lazyLatestMergedPartitionFileSliceList.get().stream()
.collect(Collectors.groupingBy(Pair::getKey));
@@ -762,8 +773,9 @@ public abstract class HoodieBackedTableMetadataWriter<I, O>
implements HoodieTab
fileGroupCountAndRecordsPairMap.put(partition,
initializeRecordIndexPartition(partitionFileSlicePairsMap.get(partition),
maxParallelismPerHudiPartition));
}
+ int totalFileGroupCount =
fileGroupCountAndRecordsPairMap.values().stream().mapToInt(Pair::getLeft).sum();
if (LOG.isInfoEnabled()) {
- LOG.info("Initializing partitioned record index with {} mappings",
fileGroupCountAndRecordsPairMap.values().stream().mapToInt(Pair::getLeft).sum());
+ LOG.info("Initializing partitioned record index with {} mappings",
totalFileGroupCount);
}
HoodieTimer partitionInitTimer = HoodieTimer.start();
@@ -787,7 +799,7 @@ public abstract class HoodieBackedTableMetadataWriter<I, O>
implements HoodieTab
initMetadataReader();
long totalInitTime = partitionInitTimer.endTimer();
LOG.info("Initializing partitioned record index in metadata table took {}
in ms", totalInitTime);
- return records;
+ return Pair.of(totalFileGroupCount, records);
}
private Pair<Integer, HoodieData<HoodieRecord>>
initializeRecordIndexPartition(
@@ -835,6 +847,83 @@ public abstract class HoodieBackedTableMetadataWriter<I,
O> implements HoodieTab
);
}
+ /**
+ * Validates the record index after bootstrap by comparing the expected
record count with the actual
+ * record count stored in the metadata table. The validation is performed in
a distributed manner
+ * using the engine context to count records from HFiles in parallel.
+ *
+ * @param recordIndexRecords the HoodieData containing the expected records
+ * @param fileGroupCount the expected number of file groups
+ */
+ private void validateRecordIndex(HoodieData<HoodieRecord>
recordIndexRecords, int fileGroupCount) {
+ String partitionName =
MetadataPartitionType.RECORD_INDEX.getPartitionPath();
+ HoodieTableFileSystemView fsView =
HoodieTableMetadataUtil.getFileSystemViewForMetadataTable(metadataMetaClient);
+ try {
+ // Use merged file slices to handle cases with pending compactions
+ List<FileSlice> fileSlices =
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient,
fsView, partitionName);
+
+ // Filter to only file slices with base files and extract their storage
paths
+ List<StoragePath> baseFilePaths = fileSlices.stream()
+ .filter(fs -> fs.getBaseFile().isPresent())
+ .map(fs -> fs.getBaseFile().get().getStoragePath())
+ .collect(Collectors.toList());
+
+ // Count records in a distributed manner using the engine context
+ long totalRecords = countRecordsInHFiles(baseFilePaths);
+ long expectedRecordCount = recordIndexRecords.count();
+
+ ValidationUtils.checkArgument(totalRecords == expectedRecordCount,
"Record Count Validation failed with "
+ + totalRecords + " present in record index vs the expected " +
expectedRecordCount);
+ LOG.info(String.format("Record index initialized on %d shards (expected
= %d) with %d records (expected = %d)",
+ fileSlices.size(), fileGroupCount, totalRecords,
expectedRecordCount));
+ } finally {
+ fsView.close();
+ }
+ }
+
+ /**
+ * Counts the total number of records in HFiles in a distributed manner.
+ *
+ * @param baseFilePaths list of storage paths to HFiles
+ * @return total number of records across all HFiles
+ */
+ private long countRecordsInHFiles(List<StoragePath> baseFilePaths) {
+ if (baseFilePaths.isEmpty()) {
+ return 0L;
+ }
+
+ int parallelism = Math.min(baseFilePaths.size(),
dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());
+ StorageConfiguration<?> storageConfBroadcast = storageConf;
+ HoodieFileFormat baseFileFormat =
metadataMetaClient.getTableConfig().getBaseFileFormat();
+
+ return engineContext.parallelize(baseFilePaths, parallelism)
+ .mapPartitions(pathIterator -> {
+ long count = 0L;
+ while (pathIterator.hasNext()) {
+ StoragePath path = pathIterator.next();
+ try {
+ HoodieStorage storage = HoodieStorageUtils.getStorage(path,
storageConfBroadcast);
+ HoodieConfig readerConfig = new HoodieConfig();
+ HoodieAvroFileReader reader = (HoodieAvroFileReader)
HoodieIOFactory.getIOFactory(storage)
+ .getReaderFactory(HoodieRecord.HoodieRecordType.AVRO)
+ .getFileReader(readerConfig, path, baseFileFormat,
Option.empty());
+ try {
+ count += reader.getTotalRecords();
+ } finally {
+ reader.close();
+ }
+ } catch (IOException e) {
+ throw new HoodieIOException("Error reading total records from
file " + path, e);
+ }
+ }
+ return Collections.singletonList(count).iterator();
+ }, true)
+ .collectAsList()
+ .stream()
+ .mapToLong(Long::longValue)
+ .sum();
+ }
+
/**
* Fetch record locations from FileSlice snapshot.
*
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index f0ad66449396..4b1447d3ca09 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -624,6 +624,15 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
.withDocumentation("when set to true, it fails the job on metadata
table's "
+ "table services operation failure");
+ public static final ConfigProperty<Boolean>
RECORD_INDEX_INITIALIZATION_VALIDATION_ENABLE = ConfigProperty
+ .key(METADATA_PREFIX +
".record.index.enable.validation.on.initialization")
+ .defaultValue(false)
+ .markAdvanced()
+ .sinceVersion("1.1.0")
+ .withDocumentation("Enable validation of record index after
initialization by comparing the expected record count "
+ + "with the actual record count stored in the metadata table. This
validation runs in a distributed manner "
+ + "using the compute engine. Disabled by default as it adds overhead
to the initialization process.");
+
public long getMaxLogFileSize() {
return getLong(MAX_LOG_FILE_SIZE_BYTES_PROP);
}
@@ -768,6 +777,10 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
return getInt(RECORD_INDEX_MAX_PARALLELISM);
}
+ public boolean isRecordIndexInitializationValidationEnabled() {
+ return getBooleanOrDefault(RECORD_INDEX_INITIALIZATION_VALIDATION_ENABLE);
+ }
+
public boolean shouldAutoInitialize() {
return getBoolean(AUTO_INITIALIZE);
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
index 7268b3849308..a766d30c2058 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java
@@ -2916,6 +2916,8 @@ public class TestHoodieBackedMetadata extends
TestHoodieMetadataBase {
Properties props = new Properties();
props.setProperty(HoodieMetadataConfig.GLOBAL_RECORD_LEVEL_INDEX_ENABLE_PROP.key(),
"true");
props.setProperty(HoodieIndexConfig.INDEX_TYPE.key(), "RECORD_INDEX");
+ // Enable validation of record index on initialization
+
props.setProperty(HoodieMetadataConfig.RECORD_INDEX_INITIALIZATION_VALIDATION_ENABLE.key(),
"true");
HoodieWriteConfig cfg = getWriteConfigBuilder(true, true, false)
.withProps(props).build();
// Initialize write client.