nsivabalan commented on code in PR #18826:
URL: https://github.com/apache/hudi/pull/18826#discussion_r3479804379


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -857,6 +883,32 @@ private int 
estimateFileGroupCount(HoodieData<HoodieRecord> records) {
     );
   }
 
+  /**
+   * Sums row counts read from each base file's footer metadata, in parallel 
via the engine context.
+   * Used in place of materializing and counting an RDD of records during RLI 
bootstrap.
+   */
+  private long estimateRecordCountFromBaseFiles(List<Pair<String, 
HoodieBaseFile>> partitionBaseFilePairs) {
+    if (partitionBaseFilePairs.isEmpty()) {
+      return 0L;
+    }
+    int parallelism = Math.min(partitionBaseFilePairs.size(),
+        dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());
+    StorageConfiguration<?> storageConfBroadcast = storageConf;
+    return engineContext.parallelize(partitionBaseFilePairs, parallelism)
+        .map(partitionAndBaseFile -> {
+          HoodieBaseFile baseFile = partitionAndBaseFile.getValue();
+          StoragePath path = baseFile.getStoragePath();
+          try {
+            HoodieStorage storage = HoodieStorageUtils.getStorage(path, 
storageConfBroadcast);
+            return 
HoodieIOFactory.getIOFactory(storage).getFileFormatUtils(path).getRowCount(storage,
 path);
+          } catch (Exception e) {
+            LOG.warn("Failed to read row count from base file footer: {}", 
path, e);
+            return 0L;

Review Comment:
   Addressed in bec52e4dc96c: removed the catch-all. `getRowCount` is unchecked 
so `UnsupportedOperationException` (from `HFileUtils.getRowCount`) and 
`HoodieIOException` (from format-specific readers) now propagate, matching 
`countRecordsInHFiles`. An HFile-formatted base file will fail fast rather than 
silently estimate 0; an HFile-friendly estimator path can be a follow-up if 
needed.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -824,28 +824,54 @@ private Pair<Integer, HoodieData<HoodieRecord>> 
initializeRecordIndexPartition(
         dataMetaClient,
         dataWriteConfig);
 
-    // Initialize the file groups
-    final int fileGroupCount = estimateFileGroupCount(records);
+    Pair<Integer, Integer> bounds = getRLIFileGroupCountBounds();

Review Comment:
   Addressed in bec52e4dc96c: persist is now declared in the caller 
(`initializeFilegroupsAndCommitToRecordIndexPartition`) only when 
`isRecordIndexInitializationValidationEnabled()` is true, and *before* the 
bulk-insert in both partitioned and non-partitioned branches — so bulkCommit's 
materialization populates the cache and `validateRecordIndex#count()` reads 
back from it. The matching `unpersist()` lives in the same validation-only 
branch. Happy path is deliberately left unpersisted (the RDD is consumed 
exactly once by bulk-insert), which is the original optimization.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -824,28 +824,54 @@ private Pair<Integer, HoodieData<HoodieRecord>> 
initializeRecordIndexPartition(
         dataMetaClient,
         dataWriteConfig);
 
-    // Initialize the file groups
-    final int fileGroupCount = estimateFileGroupCount(records);
+    Pair<Integer, Integer> bounds = getRLIFileGroupCountBounds();
+    int minFileGroupCount = bounds.getLeft();
+    int maxFileGroupCount = bounds.getRight();
+
+    int fileGroupCount;
+    if (minFileGroupCount != maxFileGroupCount) {
+      // Estimate file group count based on record count read from base file 
footer metadata.
+      // Avoids the expensive records.persist() + records.count() pass over 
materialized records.
+      List<Pair<String, HoodieBaseFile>> partitionBaseFilePairs = 
latestMergedPartitionFileSliceList.stream()

Review Comment:
   Addressed in bec52e4dc96c with a code comment: RLI is keyed by record key, 
and log files in a slice can only carry updates or deletes for existing records 
(RLI updates rewrite the entry rather than add a new one). Log files never 
contribute new distinct record-key cardinality, so filtering to base-file-only 
slices for the row-count estimate cannot undercount the file-group sizing. No 
follow-up needed unless we revisit the model that log files only carry 
updates/deletes.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -857,6 +883,32 @@ private int 
estimateFileGroupCount(HoodieData<HoodieRecord> records) {
     );
   }
 
+  /**
+   * Sums row counts read from each base file's footer metadata, in parallel 
via the engine context.
+   * Used in place of materializing and counting an RDD of records during RLI 
bootstrap.
+   */
+  private long estimateRecordCountFromBaseFiles(List<Pair<String, 
HoodieBaseFile>> partitionBaseFilePairs) {
+    if (partitionBaseFilePairs.isEmpty()) {
+      return 0L;
+    }
+    int parallelism = Math.min(partitionBaseFilePairs.size(),
+        dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());
+    StorageConfiguration<?> storageConfBroadcast = storageConf;

Review Comment:
   As wombatu-kun noted, this name is the existing pattern at 
`countRecordsInHFiles` (HoodieBackedTableMetadataWriter.java:908) which this 
method mirrors. Renaming only the new copy diverges from the established 
convention; happy to do a separate rename of both call sites as a minor cleanup 
PR if you prefer.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -857,6 +883,32 @@ private int 
estimateFileGroupCount(HoodieData<HoodieRecord> records) {
     );
   }
 
+  /**
+   * Sums row counts read from each base file's footer metadata, in parallel 
via the engine context.
+   * Used in place of materializing and counting an RDD of records during RLI 
bootstrap.
+   */
+  private long estimateRecordCountFromBaseFiles(List<Pair<String, 
HoodieBaseFile>> partitionBaseFilePairs) {
+    if (partitionBaseFilePairs.isEmpty()) {
+      return 0L;
+    }
+    int parallelism = Math.min(partitionBaseFilePairs.size(),
+        dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());
+    StorageConfiguration<?> storageConfBroadcast = storageConf;
+    return engineContext.parallelize(partitionBaseFilePairs, parallelism)
+        .map(partitionAndBaseFile -> {
+          HoodieBaseFile baseFile = partitionAndBaseFile.getValue();

Review Comment:
   Addressed in 4c398477 (now part of bec52e4dc96c after rebase) — switched to 
`getRight()` for uniformity.



##########
hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestHoodieBackedMetadata.java:
##########
@@ -4379,6 +4379,104 @@ private void changeTableVersion(HoodieTableVersion 
version) throws IOException {
     }
   }
 
+  /**
+   * Validates that RLI initialization estimates file group count from base 
file footer metadata
+   * (instead of materializing and counting records) when min != max file 
group count.
+   */
+  @ParameterizedTest
+  @EnumSource(HoodieTableType.class)
+  public void testRecordIndexFileGroupEstimation(HoodieTableType tableType) 
throws Exception {

Review Comment:
   Good catch. Fixed in bec52e4dc96c: pinned `growthFactor=2.0` and 
`maxFileGroupSizeBytes=2400` in `testRecordIndexFileGroupEstimation`. The math 
now lands at 8 file groups (200 records × 2.0 / (2400/48 = 50) = 8, between 
min=1 and max=10) and the assertion is bumped to `fileGroupCount > 1` — if the 
estimator returns 0 or its output is ignored, the count clamps to 
minFileGroupCount=1 and the test fails. 
`testRecordIndexWithFixedFileGroupCount` intentionally exercises the bypass 
branch (the Javadoc already calls that out) so it's left as-is.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -824,28 +824,54 @@ private Pair<Integer, HoodieData<HoodieRecord>> 
initializeRecordIndexPartition(
         dataMetaClient,
         dataWriteConfig);
 
-    // Initialize the file groups
-    final int fileGroupCount = estimateFileGroupCount(records);
+    Pair<Integer, Integer> bounds = getRLIFileGroupCountBounds();
+    int minFileGroupCount = bounds.getLeft();
+    int maxFileGroupCount = bounds.getRight();
+
+    int fileGroupCount;
+    if (minFileGroupCount != maxFileGroupCount) {
+      // Estimate file group count based on record count read from base file 
footer metadata.
+      // Avoids the expensive records.persist() + records.count() pass over 
materialized records.
+      List<Pair<String, HoodieBaseFile>> partitionBaseFilePairs = 
latestMergedPartitionFileSliceList.stream()
+          .filter(p -> p.getRight().getBaseFile().isPresent())
+          .map(p -> Pair.of(p.getLeft(), p.getRight().getBaseFile().get()))
+          .collect(Collectors.toList());
+      fileGroupCount = estimateFileGroupCountFromBaseFiles(
+          partitionBaseFilePairs, minFileGroupCount, maxFileGroupCount);
+      LOG.info("Estimated {} file groups from base file footer metadata", 
fileGroupCount);
+    } else {
+      // min == max: skip estimation, use the fixed value directly.
+      fileGroupCount = minFileGroupCount;
+      LOG.info("Using user-configured file group count: {}", fileGroupCount);
+    }
+
     LOG.info("Initializing record index with {} file groups.", fileGroupCount);
     return Pair.of(fileGroupCount, records);
   }
 
-  private int estimateFileGroupCount(HoodieData<HoodieRecord> records) {
-    int minFileGroupCount;
-    int maxFileGroupCount;
+  /**
+   * Returns the (min, max) file group count bounds for RLI based on which RLI 
variant is enabled.
+   */
+  private Pair<Integer, Integer> getRLIFileGroupCountBounds() {
     if (dataWriteConfig.isRecordLevelIndexEnabled()) {
-      minFileGroupCount = 
dataWriteConfig.getRecordLevelIndexMinFileGroupCount();
-      maxFileGroupCount = 
dataWriteConfig.getRecordLevelIndexMaxFileGroupCount();
-    } else {
-      minFileGroupCount = 
dataWriteConfig.getGlobalRecordLevelIndexMinFileGroupCount();
-      maxFileGroupCount = 
dataWriteConfig.getGlobalRecordLevelIndexMaxFileGroupCount();
-    }
-    Supplier<Long> recordCountSupplier = () -> {
-      records.persist("MEMORY_AND_DISK_SER");
-      long count = records.count();
-      LOG.info("Initializing record index with {} mappings", count);
-      return count;
-    };
+      return Pair.of(dataWriteConfig.getRecordLevelIndexMinFileGroupCount(),

Review Comment:
   Good question. Yes — addressed in bec52e4dc96c. The persist now happens in 
the caller before `bulkCommit` (only when validation is enabled), so 
`validateRecordIndex#count()` reads from the cached RDD instead of re-deriving 
keys. See the reply on the related thread above for the full plumbing.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -859,6 +885,32 @@ private int 
estimateFileGroupCount(HoodieData<HoodieRecord> records) {
     );
   }
 
+  /**
+   * Sums row counts read from each base file's footer metadata, in parallel 
via the engine context.
+   * Used in place of materializing and counting an RDD of records during RLI 
bootstrap.
+   */
+  private long estimateRecordCountFromBaseFiles(List<Pair<String, 
HoodieBaseFile>> partitionBaseFilePairs) {
+    if (partitionBaseFilePairs.isEmpty()) {
+      return 0L;
+    }
+    int parallelism = Math.min(partitionBaseFilePairs.size(),
+        dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());
+    StorageConfiguration<?> storageConfBroadcast = storageConf;

Review Comment:
   Same response as the earlier thread on this name — mirroring 
`countRecordsInHFiles`'s convention. Open to a rename across both call sites in 
a separate cleanup PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to