hudi-agent commented on code in PR #18826:
URL: https://github.com/apache/hudi/pull/18826#discussion_r3484705446


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -859,6 +905,32 @@ private int 
estimateFileGroupCount(HoodieData<HoodieRecord> records) {
     );
   }
 
+  /**
+   * Sums row counts read from each base file's footer metadata, in parallel 
via the engine context.
+   * Used in place of materializing and counting an RDD of records during RLI 
bootstrap.
+   *
+   * <p>Errors (e.g. {@link UnsupportedOperationException} from {@code 
HFileUtils.getRowCount},
+   * IO failures) are rethrown rather than swallowed — silently returning 
{@code 0L} per failed
+   * file would systematically undercount the estimate and provision RLI at
+   * {@code minFileGroupCount}. The contract mirrors {@link 
#countRecordsInHFiles(List)}.
+   */
+  private long estimateRecordCountFromBaseFiles(List<Pair<String, 
HoodieBaseFile>> partitionBaseFilePairs) {
+    if (partitionBaseFilePairs.isEmpty()) {
+      return 0L;
+    }

Review Comment:
   🤖 nit: `storageConfBroadcast` reads like a Spark `Broadcast<T>` variable, 
but this is just a local alias of `storageConf` to satisfy the lambda's 
effectively-final requirement. Could you rename it to something like 
`localStorageConf` or `storageConfRef` to avoid that confusion?
   
   <sub><i>⚠️ AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -826,28 +842,58 @@ private Pair<Integer, HoodieData<HoodieRecord>> 
initializeRecordIndexPartition(
         dataMetaClient,
         dataWriteConfig);
 
-    // Initialize the file groups
-    final int fileGroupCount = estimateFileGroupCount(records);
+    Pair<Integer, Integer> bounds = getRLIFileGroupCountBounds();
+    int minFileGroupCount = bounds.getLeft();
+    int maxFileGroupCount = bounds.getRight();
+
+    int fileGroupCount;
+    if (minFileGroupCount != maxFileGroupCount) {
+      // Estimate file group count based on record count read from base file 
footer metadata.
+      // Avoids the expensive records.persist() + records.count() pass over 
materialized records.
+      // Filtering to base-file-only slices is safe for RLI: log files in a 
slice can only carry
+      // updates or deletes for existing records, never new inserts (RLI is 
keyed by record key
+      // and updates rewrite the entry rather than add one), so they do not 
contribute to the
+      // distinct-record-count estimate that sizes the file groups.
+      List<Pair<String, HoodieBaseFile>> partitionBaseFilePairs = 
latestMergedPartitionFileSliceList.stream()
+          .filter(p -> p.getRight().getBaseFile().isPresent())
+          .map(p -> Pair.of(p.getLeft(), p.getRight().getBaseFile().get()))
+          .collect(Collectors.toList());
+      fileGroupCount = estimateFileGroupCountFromBaseFiles(
+          partitionBaseFilePairs, minFileGroupCount, maxFileGroupCount);
+      LOG.info("Estimated {} file groups from base file footer metadata", 
fileGroupCount);
+    } else {
+      // min == max: skip estimation, use the fixed value directly.
+      fileGroupCount = minFileGroupCount;
+      LOG.info("Using user-configured file group count: {}", fileGroupCount);
+    }
+
     LOG.info("Initializing record index with {} file groups.", fileGroupCount);
     return Pair.of(fileGroupCount, records);
   }
 
-  private int estimateFileGroupCount(HoodieData<HoodieRecord> records) {
-    int minFileGroupCount;
-    int maxFileGroupCount;
+  /**
+   * Returns the (min, max) file group count bounds for RLI based on which RLI 
variant is enabled.
+   */
+  private Pair<Integer, Integer> getRLIFileGroupCountBounds() {
     if (dataWriteConfig.isRecordLevelIndexEnabled()) {
-      minFileGroupCount = 
dataWriteConfig.getRecordLevelIndexMinFileGroupCount();
-      maxFileGroupCount = 
dataWriteConfig.getRecordLevelIndexMaxFileGroupCount();
-    } else {
-      minFileGroupCount = 
dataWriteConfig.getGlobalRecordLevelIndexMinFileGroupCount();
-      maxFileGroupCount = 
dataWriteConfig.getGlobalRecordLevelIndexMaxFileGroupCount();
+      return Pair.of(dataWriteConfig.getRecordLevelIndexMinFileGroupCount(),
+          dataWriteConfig.getRecordLevelIndexMaxFileGroupCount());
     }
-    Supplier<Long> recordCountSupplier = () -> {
-      records.persist("MEMORY_AND_DISK_SER");
-      long count = records.count();
-      LOG.info("Initializing record index with {} mappings", count);
-      return count;
-    };
+    return 
Pair.of(dataWriteConfig.getGlobalRecordLevelIndexMinFileGroupCount(),
+        dataWriteConfig.getGlobalRecordLevelIndexMaxFileGroupCount());
+  }
+
+  /**
+   * Estimates RLI file group count using a record count derived from base 
file footer metadata.
+   * Reading row counts from the footer is an O(1)-per-file operation that 
avoids materializing
+   * records to compute a count.
+   */
+  private int estimateFileGroupCountFromBaseFiles(List<Pair<String, 
HoodieBaseFile>> partitionBaseFilePairs,
+                                                  int minFileGroupCount,
+                                                  int maxFileGroupCount) {
+    long estimatedRecordCount = 
estimateRecordCountFromBaseFiles(partitionBaseFilePairs);
+    LOG.info("Estimated total record count from base file footers: {}", 
estimatedRecordCount);
+    Supplier<Long> recordCountSupplier = () -> estimatedRecordCount;

Review Comment:
   🤖 nit: the name `recordCountSupplier` carries a connotation of deferred/lazy 
computation, but `estimatedRecordCount` is already eagerly computed on the line 
above. Have you considered inlining it as `() -> estimatedRecordCount` directly 
in the call, or renaming to something like `precomputedCountSupplier` to make 
it clear the count is already in hand?
   
   <sub><i>⚠️ AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to