This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-1.1.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit b0d56abb6ee7909356db1825ca9c19c0ce97f94f
Author: Lokesh Jain <[email protected]>
AuthorDate: Sat Nov 8 05:26:32 2025 +0530

    fix: Persist RLI index bootstrap records only if estimation is required and 
add unpersist (#14069)
    
    Co-authored-by: Lokesh Jain <[email protected]>
---
 .../metadata/HoodieBackedTableMetadataWriter.java  | 34 +++++++++++++---------
 .../hudi/metadata/HoodieTableMetadataUtil.java     |  6 ++--
 .../hudi/functional/RecordLevelIndexTestBase.scala |  2 +-
 .../hudi/functional/TestMetadataRecordIndex.scala  |  2 +-
 4 files changed, 27 insertions(+), 17 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 1a4cab3bb185..ab7ba22fdec4 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -110,6 +110,7 @@ import java.util.Queue;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.function.Function;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -671,7 +672,7 @@ public abstract class HoodieBackedTableMetadataWriter<I, O> 
implements HoodieTab
         dataWriteConfig.getProps());
 
     // Initialize the file groups - using the same estimation logic as that of 
record index
-    final int fileGroupCount = 
HoodieTableMetadataUtil.estimateFileGroupCount(RECORD_INDEX, records.count(),
+    final int fileGroupCount = 
HoodieTableMetadataUtil.estimateFileGroupCount(RECORD_INDEX, records::count,
         RECORD_INDEX_AVERAGE_RECORD_SIZE, 
dataWriteConfig.getRecordIndexMinFileGroupCount(),
         dataWriteConfig.getRecordIndexMaxFileGroupCount(), 
dataWriteConfig.getRecordIndexGrowthFactor(),
         dataWriteConfig.getRecordIndexMaxFileGroupSizeBytes());
@@ -737,16 +738,19 @@ public abstract class HoodieBackedTableMetadataWriter<I, 
O> implements HoodieTab
   private void initializeFilegroupsAndCommitToRecordIndexPartition(String 
commitTimeForPartition,
                                                                    
Lazy<List<Pair<String, FileSlice>>> lazyLatestMergedPartitionFileSliceList, 
boolean isPartitionedRLI) throws IOException {
     createRecordIndexDefinition(dataMetaClient, 
Collections.singletonMap(HoodieRecordIndex.IS_PARTITIONED_OPTION, 
String.valueOf(isPartitionedRLI)));
+    HoodieData<HoodieRecord> recordIndexRecords;
     if (isPartitionedRLI) {
-      
initializeFilegroupsAndCommitToPartitionedRecordIndexPartition(commitTimeForPartition,
 lazyLatestMergedPartitionFileSliceList);
+      recordIndexRecords = 
initializeFilegroupsAndCommitToPartitionedRecordIndexPartition(commitTimeForPartition,
 lazyLatestMergedPartitionFileSliceList);
     } else {
-      initializeFilegroupsAndCommit(RECORD_INDEX, 
RECORD_INDEX.getPartitionPath(),
-          
initializeRecordIndexPartition(lazyLatestMergedPartitionFileSliceList.get(),
-              
dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism()), 
commitTimeForPartition);
+      Pair<Integer, HoodieData<HoodieRecord>> fgCountAndRecordIndexRecords = 
initializeRecordIndexPartition(lazyLatestMergedPartitionFileSliceList.get(),
+          dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());
+      recordIndexRecords = fgCountAndRecordIndexRecords.getRight();
+      initializeFilegroupsAndCommit(RECORD_INDEX, 
RECORD_INDEX.getPartitionPath(), fgCountAndRecordIndexRecords, 
commitTimeForPartition);
     }
+    recordIndexRecords.unpersist();
   }
 
-  private void 
initializeFilegroupsAndCommitToPartitionedRecordIndexPartition(String 
commitTimeForPartition,
+  private HoodieData<HoodieRecord> 
initializeFilegroupsAndCommitToPartitionedRecordIndexPartition(String 
commitTimeForPartition,
                                                                               
Lazy<List<Pair<String, FileSlice>>> lazyLatestMergedPartitionFileSliceList) 
throws IOException {
     Map<String, List<Pair<String, FileSlice>>> partitionFileSlicePairsMap = 
lazyLatestMergedPartitionFileSliceList.get().stream()
         .collect(Collectors.groupingBy(Pair::getKey));
@@ -782,6 +786,7 @@ public abstract class HoodieBackedTableMetadataWriter<I, O> 
implements HoodieTab
     initMetadataReader();
     long totalInitTime = partitionInitTimer.endTimer();
     LOG.info("Initializing partitioned record index in metadata table took {} 
in ms", totalInitTime);
+    return records;
   }
 
   private Pair<Integer, HoodieData<HoodieRecord>> 
initializeRecordIndexPartition(
@@ -795,17 +800,14 @@ public abstract class HoodieBackedTableMetadataWriter<I, 
O> implements HoodieTab
         this.getClass().getSimpleName(),
         dataMetaClient,
         dataWriteConfig);
-    records.persist("MEMORY_AND_DISK_SER");
-    final long recordCount = records.count();
 
     // Initialize the file groups
-    final int fileGroupCount = estimateFileGroupCount(recordCount);
-
-    LOG.info("Initializing record index with {} mappings and {} file groups.", 
recordCount, fileGroupCount);
+    final int fileGroupCount = estimateFileGroupCount(records);
+    LOG.info("Initializing record index with {} file groups.", fileGroupCount);
     return Pair.of(fileGroupCount, records);
   }
 
-  private int estimateFileGroupCount(long recordCount) {
+  private int estimateFileGroupCount(HoodieData<HoodieRecord> records) {
     int minFileGroupCount;
     int maxFileGroupCount;
     if (dataWriteConfig.isRecordLevelIndexEnabled()) {
@@ -815,9 +817,15 @@ public abstract class HoodieBackedTableMetadataWriter<I, 
O> implements HoodieTab
       minFileGroupCount = dataWriteConfig.getRecordIndexMinFileGroupCount();
       maxFileGroupCount = dataWriteConfig.getRecordIndexMaxFileGroupCount();
     }
+    Supplier<Long> recordCountSupplier = () -> {
+      records.persist("MEMORY_AND_DISK_SER");
+      long count = records.count();
+      LOG.info("Initializing record index with {} mappings", count);
+      return count;
+    };
     return HoodieTableMetadataUtil.estimateFileGroupCount(
         MetadataPartitionType.RECORD_INDEX,
-        recordCount,
+        recordCountSupplier,
         RECORD_INDEX_AVERAGE_RECORD_SIZE,
         minFileGroupCount,
         maxFileGroupCount,
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index cfa4c921f35d..422fdbc07559 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -2325,7 +2325,7 @@ public class HoodieTableMetadataUtil {
    * Estimates the file group count to use for a MDT partition.
    *
    * @param partitionType         Type of the partition for which the file 
group count is to be estimated.
-   * @param recordCount           The number of records expected to be written.
+   * @param recordCountSupplier   Supplies the number of records expected to 
be written.
    * @param averageRecordSize     Average size of each record to be written.
    * @param minFileGroupCount     Minimum number of file groups to use.
    * @param maxFileGroupCount     Maximum number of file groups to use.
@@ -2333,14 +2333,16 @@ public class HoodieTableMetadataUtil {
    * @param maxFileGroupSizeBytes Maximum size of the file group.
    * @return The estimated number of file groups.
    */
-  public static int estimateFileGroupCount(MetadataPartitionType 
partitionType, long recordCount, int averageRecordSize, int minFileGroupCount,
+  public static int estimateFileGroupCount(MetadataPartitionType 
partitionType, Supplier<Long> recordCountSupplier, int averageRecordSize, int 
minFileGroupCount,
                                            int maxFileGroupCount, float 
growthFactor, int maxFileGroupSizeBytes) {
     int fileGroupCount;
 
+    long recordCount = -1;
     // If a fixed number of file groups are desired
     if ((minFileGroupCount == maxFileGroupCount) && (minFileGroupCount != 0)) {
       fileGroupCount = minFileGroupCount;
     } else {
+      recordCount = recordCountSupplier.get();
       // Number of records to estimate for
       final long expectedNumRecords = (long) Math.ceil((float) recordCount * 
growthFactor);
       // Maximum records that should be written to each file group so that it 
does not go over the size limit required
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
index 0d486e1ddcf8..ae462759e0de 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/RecordLevelIndexTestBase.scala
@@ -151,7 +151,7 @@ class RecordLevelIndexTestBase extends 
HoodieStatsIndexTestBase {
     assertEquals(0, recordIndexMapForDeletedRows.size(), "deleted records 
should not present in RLI")
 
     assertEquals(rowArr.length, recordIndexMap.keySet.size)
-    val estimatedFileGroupCount = 
HoodieTableMetadataUtil.estimateFileGroupCount(MetadataPartitionType.RECORD_INDEX,
 rowArr.length, 48,
+    val estimatedFileGroupCount = 
HoodieTableMetadataUtil.estimateFileGroupCount(MetadataPartitionType.RECORD_INDEX,
 () => rowArr.length, 48,
       writeConfig.getRecordIndexMinFileGroupCount, 
writeConfig.getRecordIndexMaxFileGroupCount,
       writeConfig.getRecordIndexGrowthFactor, 
writeConfig.getRecordIndexMaxFileGroupSizeBytes)
     assertEquals(estimatedFileGroupCount, 
getFileGroupCountForRecordIndex(writeConfig))
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataRecordIndex.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataRecordIndex.scala
index 9806ca93ec93..111947054a91 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataRecordIndex.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestMetadataRecordIndex.scala
@@ -206,7 +206,7 @@ class TestMetadataRecordIndex extends 
HoodieSparkClientTestBase {
 
     assertEquals(rowArr.length, recordIndexMap.keySet.size)
     val estimatedFileGroupCount = 
HoodieTableMetadataUtil.estimateFileGroupCount(
-      MetadataPartitionType.RECORD_INDEX, rowArr.length, 48,
+      MetadataPartitionType.RECORD_INDEX, () => rowArr.length, 48,
       writeConfig.getRecordIndexMinFileGroupCount, 
writeConfig.getRecordIndexMaxFileGroupCount,
       writeConfig.getRecordIndexGrowthFactor, 
writeConfig.getRecordIndexMaxFileGroupSizeBytes)
     assertEquals(estimatedFileGroupCount, 
getFileGroupCountForRecordIndex(writeConfig))

Reply via email to