This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-1.1.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit b0d56abb6ee7909356db1825ca9c19c0ce97f94f Author: Lokesh Jain <[email protected]> AuthorDate: Sat Nov 8 05:26:32 2025 +0530 fix: Persist RLI index bootstrap records only if estimation is required and add unpersist (#14069) Co-authored-by: Lokesh Jain <[email protected]> --- .../metadata/HoodieBackedTableMetadataWriter.java | 34 +++++++++++++--------- .../hudi/metadata/HoodieTableMetadataUtil.java | 6 ++-- .../hudi/functional/RecordLevelIndexTestBase.scala | 2 +- .../hudi/functional/TestMetadataRecordIndex.scala | 2 +- 4 files changed, 27 insertions(+), 17 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java index 1a4cab3bb185..ab7ba22fdec4 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java @@ -110,6 +110,7 @@ import java.util.Queue; import java.util.Set; import java.util.TreeMap; import java.util.function.Function; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -671,7 +672,7 @@ public abstract class HoodieBackedTableMetadataWriter<I, O> implements HoodieTab dataWriteConfig.getProps()); // Initialize the file groups - using the same estimation logic as that of record index - final int fileGroupCount = HoodieTableMetadataUtil.estimateFileGroupCount(RECORD_INDEX, records.count(), + final int fileGroupCount = HoodieTableMetadataUtil.estimateFileGroupCount(RECORD_INDEX, records::count, RECORD_INDEX_AVERAGE_RECORD_SIZE, dataWriteConfig.getRecordIndexMinFileGroupCount(), dataWriteConfig.getRecordIndexMaxFileGroupCount(), dataWriteConfig.getRecordIndexGrowthFactor(), dataWriteConfig.getRecordIndexMaxFileGroupSizeBytes()); @@ -737,16 +738,19 @@ public abstract class HoodieBackedTableMetadataWriter<I, O> implements HoodieTab private void initializeFilegroupsAndCommitToRecordIndexPartition(String commitTimeForPartition, Lazy<List<Pair<String, FileSlice>>> lazyLatestMergedPartitionFileSliceList, boolean isPartitionedRLI) throws IOException { createRecordIndexDefinition(dataMetaClient, Collections.singletonMap(HoodieRecordIndex.IS_PARTITIONED_OPTION, String.valueOf(isPartitionedRLI))); + HoodieData<HoodieRecord> recordIndexRecords; if (isPartitionedRLI) { - initializeFilegroupsAndCommitToPartitionedRecordIndexPartition(commitTimeForPartition, lazyLatestMergedPartitionFileSliceList); + recordIndexRecords = initializeFilegroupsAndCommitToPartitionedRecordIndexPartition(commitTimeForPartition, lazyLatestMergedPartitionFileSliceList); } else { - initializeFilegroupsAndCommit(RECORD_INDEX, RECORD_INDEX.getPartitionPath(), - initializeRecordIndexPartition(lazyLatestMergedPartitionFileSliceList.get(), - dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism()), commitTimeForPartition); + Pair<Integer, HoodieData<HoodieRecord>> fgCountAndRecordIndexRecords = initializeRecordIndexPartition(lazyLatestMergedPartitionFileSliceList.get(), + dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism()); + recordIndexRecords = fgCountAndRecordIndexRecords.getRight(); + initializeFilegroupsAndCommit(RECORD_INDEX, RECORD_INDEX.getPartitionPath(), fgCountAndRecordIndexRecords, commitTimeForPartition); } + recordIndexRecords.unpersist(); } - private void initializeFilegroupsAndCommitToPartitionedRecordIndexPartition(String commitTimeForPartition, + private HoodieData<HoodieRecord> initializeFilegroupsAndCommitToPartitionedRecordIndexPartition(String commitTimeForPartition, Lazy<List<Pair<String, FileSlice>>> lazyLatestMergedPartitionFileSliceList) throws IOException { Map<String, List<Pair<String, FileSlice>>> partitionFileSlicePairsMap = lazyLatestMergedPartitionFileSliceList.get().stream() .collect(Collectors.groupingBy(Pair::getKey)); @@ -782,6 +786,7 @@ public abstract class HoodieBackedTableMetadataWriter<I, O> implements HoodieTab initMetadataReader(); long totalInitTime = partitionInitTimer.endTimer(); LOG.info("Initializing partitioned record index in metadata table took {} in ms", totalInitTime); + return records; } private Pair<Integer, HoodieData<HoodieRecord>> initializeRecordIndexPartition( @@ -795,17 +800,14 @@ public abstract class HoodieBackedTableMetadataWriter<I, O> implements HoodieTab this.getClass().getSimpleName(), dataMetaClient, dataWriteConfig); - records.persist("MEMORY_AND_DISK_SER"); - final long recordCount = records.count(); // Initialize the file groups - final int fileGroupCount = estimateFileGroupCount(recordCount); - - LOG.info("Initializing record index with {} mappings and {} file groups.", recordCount, fileGroupCount); + final int fileGroupCount = estimateFileGroupCount(records); + LOG.info("Initializing record index with {} file groups.", fileGroupCount); return Pair.of(fileGroupCount, records); } - private int estimateFileGroupCount(long recordCount) { + private int estimateFileGroupCount(HoodieData<HoodieRecord> records) { int minFileGroupCount; int maxFileGroupCount; if (dataWriteConfig.isRecordLevelIndexEnabled()) { @@ -815,9 +817,15 @@ public abstract class HoodieBackedTableMetadataWriter<I, O> implements HoodieTab minFileGroupCount = dataWriteConfig.getRecordIndexMinFileGroupCount(); maxFileGroupCount = dataWriteConfig.getRecordIndexMaxFileGroupCount(); } + Supplier<Long> recordCountSupplier = () -> { + records.persist("MEMORY_AND_DISK_SER"); + long count = records.count(); + LOG.info("Initializing record index with {} mappings", count); + return count; + }; return HoodieTableMetadataUtil.estimateFileGroupCount( MetadataPartitionType.RECORD_INDEX, - recordCount, + recordCountSupplier, RECORD_INDEX_AVERAGE_RECORD_SIZE, minFileGroupCount, maxFileGroupCount, diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index cfa4c921f35d..422fdbc07559 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -2325,7 +2325,7 @@ public class HoodieTableMetadataUtil { * Estimates the file group count to use for a MDT partition. * * @param partitionType Type of the partition for which the file group count is to be estimated. - * @param recordCount The number of records expected to be written. + * @param recordCountSupplier Supplies the number of records expected to be written. * @param averageRecordSize Average size of each record to be written. * @param minFileGroupCount Minimum number of file groups to use. * @param maxFileGroupCount Maximum number of file groups to use. @@ -2333,14 +2333,16 @@ public class HoodieTableMetadataUtil { * @param maxFileGroupSizeBytes Maximum size of the file group. * @return The estimated number of file groups. */ - public static int estimateFileGroupCount(MetadataPartitionType partitionType, long recordCount, int averageRecordSize, int minFileGroupCount, + public static int estimateFileGroupCount(MetadataPartitionType partitionType, Supplier<Long> recordCountSupplier, int averageRecordSize, int minFileGroupCount, int maxFileGroupCount, float growthFactor, int maxFileGroupSizeBytes) { int fileGroupCount; + long recordCount = -1; // If a fixed number of file groups are desired if ((minFileGroupCount == maxFileGroupCount) && (minFileGroupCount != 0)) { fileGroupCount = minFileGroupCount; } else { + recordCount = recordCountSupplier.get(); // Number of records to estimate for final long expectedNumRecords = (long) Math.ceil((float) recordCount * growthFactor); // Maximum records that should be written to each file group so that it does not go over the size limit required diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala index 0d486e1ddcf8..ae462759e0de 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala @@ -151,7 +151,7 @@ class RecordLevelIndexTestBase extends HoodieStatsIndexTestBase { assertEquals(0, recordIndexMapForDeletedRows.size(), "deleted records should not present in RLI") assertEquals(rowArr.length, recordIndexMap.keySet.size) - val estimatedFileGroupCount = HoodieTableMetadataUtil.estimateFileGroupCount(MetadataPartitionType.RECORD_INDEX, rowArr.length, 48, + val estimatedFileGroupCount = HoodieTableMetadataUtil.estimateFileGroupCount(MetadataPartitionType.RECORD_INDEX, () => rowArr.length, 48, writeConfig.getRecordIndexMinFileGroupCount, writeConfig.getRecordIndexMaxFileGroupCount, writeConfig.getRecordIndexGrowthFactor, writeConfig.getRecordIndexMaxFileGroupSizeBytes) assertEquals(estimatedFileGroupCount, getFileGroupCountForRecordIndex(writeConfig)) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataRecordIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataRecordIndex.scala index 9806ca93ec93..111947054a91 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataRecordIndex.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataRecordIndex.scala @@ -206,7 +206,7 @@ class TestMetadataRecordIndex extends HoodieSparkClientTestBase { assertEquals(rowArr.length, recordIndexMap.keySet.size) val estimatedFileGroupCount = HoodieTableMetadataUtil.estimateFileGroupCount( - MetadataPartitionType.RECORD_INDEX, rowArr.length, 48, + MetadataPartitionType.RECORD_INDEX, () => rowArr.length, 48, writeConfig.getRecordIndexMinFileGroupCount, writeConfig.getRecordIndexMaxFileGroupCount, writeConfig.getRecordIndexGrowthFactor, writeConfig.getRecordIndexMaxFileGroupSizeBytes) assertEquals(estimatedFileGroupCount, getFileGroupCountForRecordIndex(writeConfig))
