codope commented on code in PR #18354:
URL: https://github.com/apache/hudi/pull/18354#discussion_r3006279548


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -847,6 +887,195 @@ private int 
estimateFileGroupCount(HoodieData<HoodieRecord> records) {
     );
   }
 
+  /**
+   * Estimates total record count using size-weighted sampling:
+   * 1. Sample a fraction of file slices and count their actual records 
(lightweight - no RLI record construction)
+   * 2. Calculate average records-per-byte ratio from samples
+   * 3. Use file sizes of ALL file slices to estimate total records
+   *
+   * If total file slices are below threshold, counts all file slices directly 
without sampling.
+   * Threshold: 50 for global RLI, 10 for partitioned RLI.
+   *
+   * @param partitionFileSliceList list of partition and file slice pairs
+   * @param samplingFraction fraction of file slices to sample
+   * @return estimated total record count
+   */
+  private long estimateRecordCountBySizeWeightedSampling(List<Pair<String, 
FileSlice>> partitionFileSliceList,
+                                                          double 
samplingFraction) {
+    if (partitionFileSliceList.isEmpty()) {
+      return 0L;
+    }
+
+    int totalFileSlices = partitionFileSliceList.size();
+
+    // Determine threshold based on RLI type
+    // Global RLI: higher threshold (50) since it's one big partition
+    // Partitioned RLI: lower threshold (10) since we process per partition
+    int samplingThreshold = dataWriteConfig.isRecordLevelIndexEnabled() ? 10 : 
50;
+
+    // For small number of file slices, skip sampling and count all
+    if (totalFileSlices <= samplingThreshold) {
+      LOG.info("Total file slices ({}) <= threshold ({}), counting all file 
slices without sampling",
+          totalFileSlices, samplingThreshold);
+      Pair<Long, Long> countAndSize = 
countRecordsFromFileSlices(partitionFileSliceList);
+      return countAndSize.getLeft();
+    }
+
+    int sampleSize = Math.max(1, (int) Math.ceil(totalFileSlices * 
samplingFraction));
+
+    LOG.info("Sampling {} out of {} file slices ({:.1f}%) to estimate record 
count",
+        sampleSize, totalFileSlices, samplingFraction * 100);
+
+    // Sample file slices uniformly
+    List<Pair<String, FileSlice>> sampledFileSlices = 
sampleFileSlicesUniformly(partitionFileSliceList, sampleSize);
+
+    // Count records from sampled file slices (lightweight - just counting, no 
RLI record construction)
+    // Returns: (recordCount, actualSampledBaseFileSize) - only counts file 
slices with base files
+    Pair<Long, Long> countAndSize = 
countRecordsFromFileSlices(sampledFileSlices);
+    long sampledRecordCount = countAndSize.getLeft();
+    long actualSampledSize = countAndSize.getRight();
+
+    LOG.info("Counted {} records from {} bytes of actual sampled base files", 
sampledRecordCount, actualSampledSize);
+
+    if (actualSampledSize == 0) {
+      LOG.warn("No base files found in sampled file slices, returning 0 as 
estimate");
+      return 0L;
+    }
+
+    double recordsPerByte = (double) sampledRecordCount / actualSampledSize;
+    LOG.info("Calculated records-per-byte ratio: {:.6f}", recordsPerByte);
+
+    // Estimate total records using ALL file slice base file sizes
+    long totalBaseFileSize = partitionFileSliceList.stream()
+        .mapToLong(p -> getFileSliceSize(p.getValue()))
+        .sum();
+
+    long estimatedTotal = (long) Math.ceil(totalBaseFileSize * recordsPerByte);
+    LOG.info("Estimated total record count: {} based on {} total base file 
bytes", estimatedTotal, totalBaseFileSize);
+
+    return estimatedTotal;
+  }
+
+  /**
+   * Counts records from file slices without constructing RLI records.
+   * This is a lightweight operation used only for estimation purposes.
+   * For MOR tables, only reads from base files (not log files) for faster 
estimation.
+   *
+   * @param partitionFileSlicePairs list of partition and file slice pairs
+   * @return Pair of (total record count, total base file size actually 
sampled)
+   */
+  private <T> Pair<Long, Long> countRecordsFromFileSlices(List<Pair<String, 
FileSlice>> partitionFileSlicePairs) {
+    if (partitionFileSlicePairs.isEmpty()) {
+      return Pair.of(0L, 0L);
+    }
+
+    // Filter to only file slices with base files
+    List<Pair<String, FileSlice>> fileSlicesWithBaseFiles = 
partitionFileSlicePairs.stream()
+        .filter(p -> p.getValue().getBaseFile().isPresent())
+        .collect(Collectors.toList());
+
+    if (fileSlicesWithBaseFiles.isEmpty()) {
+      LOG.warn("No file slices with base files found for record count 
estimation");
+      return Pair.of(0L, 0L);
+    }
+
+    // Calculate total base file size for the file slices we're actually 
counting
+    long totalBaseFileSize = fileSlicesWithBaseFiles.stream()
+        .mapToLong(p -> getFileSliceSize(p.getValue()))
+        .sum();
+
+    Option<String> instantTime = 
dataMetaClient.getActiveTimeline().getCommitsTimeline()
+        .filterCompletedInstants()
+        .lastInstant()
+        .map(HoodieInstant::requestedTime);
+    if (!instantTime.isPresent()) {
+      return Pair.of(0L, 0L);
+    }
+
+    final int parallelism = Math.min(fileSlicesWithBaseFiles.size(),
+        dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());
+    ReaderContextFactory<T> readerContextFactory = 
engineContext.getReaderContextFactory(dataMetaClient);
+
+    // Parallel count operation - just iterate through base file records 
without constructing RLI records
+    // For MOR tables, we skip log files to keep estimation fast
+    long totalCount = engineContext.parallelize(fileSlicesWithBaseFiles, 
parallelism)
+        .map(partitionAndFileSlice -> {
+          final FileSlice fileSlice = partitionAndFileSlice.getValue();
+          final HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
+          HoodieReaderContext<T> readerContext = 
readerContextFactory.getContext();
+
+          try {
+            // Create a file slice with only the base file (no log files) for 
faster counting
+            FileSlice baseFileOnlySlice = new 
FileSlice(fileSlice.getPartitionPath(),
+                fileSlice.getBaseInstantTime(), fileSlice.getFileId());
+            baseFileOnlySlice.setBaseFile(baseFile);
+
+            HoodieFileGroupReader<T> fileGroupReader = 
HoodieFileGroupReader.<T>newBuilder()
+                .withReaderContext(readerContext)
+                .withHoodieTableMetaClient(dataMetaClient)
+                .withFileSlice(baseFileOnlySlice)  // Base file only
+                .withLatestCommitTime(instantTime.get())
+                .withShouldUseRecordPosition(false)
+                .withProps(dataMetaClient.getTableConfig().getProps())
+                .build();
+            ClosableIterator closableIterator = 
fileGroupReader.getClosableKeyIterator();
+            // Just count records without materializing them
+            long count = 0;
+            while (closableIterator.hasNext()) {
+              closableIterator.next();
+              count++;
+            }
+            closableIterator.close();
+            fileGroupReader.close();
+            return count;
+          } catch (Exception e) {
+            LOG.warn("Failed to count records from file slice: " + 
fileSlice.getFileId(), e);
+            return 0L;
+          }
+        })
+        .sum();
+
+    return Pair.of(totalCount, totalBaseFileSize);
+  }
+
+  /**
+   * Gets the base file size of a file slice.
+   * For RLI initialization, we only consider base file size as that's where 
the record keys are stored.
+   *
+   * @param fileSlice the file slice
+   * @return base file size in bytes, or 0 if no base file present
+   */
+  private long getFileSliceSize(FileSlice fileSlice) {
+    if (fileSlice.getBaseFile().isPresent()) {
+      return fileSlice.getBaseFile().get().getFileSize();
+    }
+    return 0L;
+  }
+
+  /**
+   * Samples file slices uniformly from the given list.
+   *
+   * @param fileSliceList list of file slices to sample from
+   * @param sampleSize number of file slices to sample
+   * @return sampled list of file slices
+   */
+  private List<Pair<String, FileSlice>> 
sampleFileSlicesUniformly(List<Pair<String, FileSlice>> fileSliceList,
+                                                                    int 
sampleSize) {
+    if (sampleSize >= fileSliceList.size()) {
+      return fileSliceList;
+    }
+
+    List<Pair<String, FileSlice>> sampled = new ArrayList<>(sampleSize);
+    double step = (double) fileSliceList.size() / sampleSize;
+
+    for (int i = 0; i < sampleSize; i++) {
+      int index = (int) Math.floor(i * step);
+      sampled.add(fileSliceList.get(index));
+    }

Review Comment:
   IIUC, this picks evenly-spaced indices from the list. If file slices are 
ordered by partition (which they typically are), this systematic sampling may 
over-represent some partitions and under-represent others. For heterogeneous 
partition profiles (different record sizes, compression), this could skew the 
records-per-byte estimate. A shuffled or random sample would be more 
statistically robust.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java:
##########
@@ -123,6 +123,13 @@ public long count() {
     return rddData.count();
   }
 
+  @Override
+  public long sum() {
+    return rddData
+        .map(obj -> (Long) obj)
+        .reduce(Long::sum);
+  }

Review Comment:
   Does this handle empty rdd? (Spark's reduce behavior is to throw on empty 
rdd right)



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -847,6 +887,195 @@ private int 
estimateFileGroupCount(HoodieData<HoodieRecord> records) {
     );
   }
 
+  /**
+   * Estimates total record count using size-weighted sampling:
+   * 1. Sample a fraction of file slices and count their actual records 
(lightweight - no RLI record construction)
+   * 2. Calculate average records-per-byte ratio from samples
+   * 3. Use file sizes of ALL file slices to estimate total records
+   *
+   * If total file slices are below threshold, counts all file slices directly 
without sampling.
+   * Threshold: 50 for global RLI, 10 for partitioned RLI.
+   *
+   * @param partitionFileSliceList list of partition and file slice pairs
+   * @param samplingFraction fraction of file slices to sample
+   * @return estimated total record count
+   */
+  private long estimateRecordCountBySizeWeightedSampling(List<Pair<String, 
FileSlice>> partitionFileSliceList,
+                                                          double 
samplingFraction) {
+    if (partitionFileSliceList.isEmpty()) {
+      return 0L;
+    }
+
+    int totalFileSlices = partitionFileSliceList.size();
+
+    // Determine threshold based on RLI type
+    // Global RLI: higher threshold (50) since it's one big partition
+    // Partitioned RLI: lower threshold (10) since we process per partition
+    int samplingThreshold = dataWriteConfig.isRecordLevelIndexEnabled() ? 10 : 
50;
+
+    // For small number of file slices, skip sampling and count all
+    if (totalFileSlices <= samplingThreshold) {
+      LOG.info("Total file slices ({}) <= threshold ({}), counting all file 
slices without sampling",
+          totalFileSlices, samplingThreshold);
+      Pair<Long, Long> countAndSize = 
countRecordsFromFileSlices(partitionFileSliceList);
+      return countAndSize.getLeft();
+    }
+
+    int sampleSize = Math.max(1, (int) Math.ceil(totalFileSlices * 
samplingFraction));
+
+    LOG.info("Sampling {} out of {} file slices ({:.1f}%) to estimate record 
count",
+        sampleSize, totalFileSlices, samplingFraction * 100);
+
+    // Sample file slices uniformly
+    List<Pair<String, FileSlice>> sampledFileSlices = 
sampleFileSlicesUniformly(partitionFileSliceList, sampleSize);
+
+    // Count records from sampled file slices (lightweight - just counting, no 
RLI record construction)
+    // Returns: (recordCount, actualSampledBaseFileSize) - only counts file 
slices with base files
+    Pair<Long, Long> countAndSize = 
countRecordsFromFileSlices(sampledFileSlices);
+    long sampledRecordCount = countAndSize.getLeft();
+    long actualSampledSize = countAndSize.getRight();
+
+    LOG.info("Counted {} records from {} bytes of actual sampled base files", 
sampledRecordCount, actualSampledSize);
+
+    if (actualSampledSize == 0) {
+      LOG.warn("No base files found in sampled file slices, returning 0 as 
estimate");
+      return 0L;
+    }
+
+    double recordsPerByte = (double) sampledRecordCount / actualSampledSize;
+    LOG.info("Calculated records-per-byte ratio: {:.6f}", recordsPerByte);
+
+    // Estimate total records using ALL file slice base file sizes
+    long totalBaseFileSize = partitionFileSliceList.stream()
+        .mapToLong(p -> getFileSliceSize(p.getValue()))
+        .sum();
+
+    long estimatedTotal = (long) Math.ceil(totalBaseFileSize * recordsPerByte);
+    LOG.info("Estimated total record count: {} based on {} total base file 
bytes", estimatedTotal, totalBaseFileSize);
+
+    return estimatedTotal;
+  }
+
+  /**
+   * Counts records from file slices without constructing RLI records.
+   * This is a lightweight operation used only for estimation purposes.
+   * For MOR tables, only reads from base files (not log files) for faster 
estimation.
+   *
+   * @param partitionFileSlicePairs list of partition and file slice pairs
+   * @return Pair of (total record count, total base file size actually 
sampled)
+   */
+  private <T> Pair<Long, Long> countRecordsFromFileSlices(List<Pair<String, 
FileSlice>> partitionFileSlicePairs) {
+    if (partitionFileSlicePairs.isEmpty()) {
+      return Pair.of(0L, 0L);
+    }
+
+    // Filter to only file slices with base files
+    List<Pair<String, FileSlice>> fileSlicesWithBaseFiles = 
partitionFileSlicePairs.stream()
+        .filter(p -> p.getValue().getBaseFile().isPresent())
+        .collect(Collectors.toList());
+
+    if (fileSlicesWithBaseFiles.isEmpty()) {
+      LOG.warn("No file slices with base files found for record count 
estimation");
+      return Pair.of(0L, 0L);
+    }
+
+    // Calculate total base file size for the file slices we're actually 
counting
+    long totalBaseFileSize = fileSlicesWithBaseFiles.stream()
+        .mapToLong(p -> getFileSliceSize(p.getValue()))
+        .sum();
+
+    Option<String> instantTime = 
dataMetaClient.getActiveTimeline().getCommitsTimeline()
+        .filterCompletedInstants()
+        .lastInstant()
+        .map(HoodieInstant::requestedTime);
+    if (!instantTime.isPresent()) {
+      return Pair.of(0L, 0L);
+    }
+
+    final int parallelism = Math.min(fileSlicesWithBaseFiles.size(),
+        dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());
+    ReaderContextFactory<T> readerContextFactory = 
engineContext.getReaderContextFactory(dataMetaClient);
+
+    // Parallel count operation - just iterate through base file records 
without constructing RLI records
+    // For MOR tables, we skip log files to keep estimation fast
+    long totalCount = engineContext.parallelize(fileSlicesWithBaseFiles, 
parallelism)
+        .map(partitionAndFileSlice -> {
+          final FileSlice fileSlice = partitionAndFileSlice.getValue();
+          final HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
+          HoodieReaderContext<T> readerContext = 
readerContextFactory.getContext();
+
+          try {
+            // Create a file slice with only the base file (no log files) for 
faster counting
+            FileSlice baseFileOnlySlice = new 
FileSlice(fileSlice.getPartitionPath(),
+                fileSlice.getBaseInstantTime(), fileSlice.getFileId());
+            baseFileOnlySlice.setBaseFile(baseFile);
+
+            HoodieFileGroupReader<T> fileGroupReader = 
HoodieFileGroupReader.<T>newBuilder()
+                .withReaderContext(readerContext)
+                .withHoodieTableMetaClient(dataMetaClient)
+                .withFileSlice(baseFileOnlySlice)  // Base file only
+                .withLatestCommitTime(instantTime.get())
+                .withShouldUseRecordPosition(false)
+                .withProps(dataMetaClient.getTableConfig().getProps())
+                .build();
+            ClosableIterator closableIterator = 
fileGroupReader.getClosableKeyIterator();
+            // Just count records without materializing them
+            long count = 0;
+            while (closableIterator.hasNext()) {
+              closableIterator.next();
+              count++;
+            }
+            closableIterator.close();
+            fileGroupReader.close();

Review Comment:
   better to have it in a finally/try-with-resources block



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -847,6 +887,195 @@ private int 
estimateFileGroupCount(HoodieData<HoodieRecord> records) {
     );
   }
 
+  /**
+   * Estimates total record count using size-weighted sampling:
+   * 1. Sample a fraction of file slices and count their actual records 
(lightweight - no RLI record construction)
+   * 2. Calculate average records-per-byte ratio from samples
+   * 3. Use file sizes of ALL file slices to estimate total records
+   *
+   * If total file slices are below threshold, counts all file slices directly 
without sampling.
+   * Threshold: 50 for global RLI, 10 for partitioned RLI.
+   *
+   * @param partitionFileSliceList list of partition and file slice pairs
+   * @param samplingFraction fraction of file slices to sample
+   * @return estimated total record count
+   */
+  private long estimateRecordCountBySizeWeightedSampling(List<Pair<String, 
FileSlice>> partitionFileSliceList,
+                                                          double 
samplingFraction) {
+    if (partitionFileSliceList.isEmpty()) {
+      return 0L;
+    }
+
+    int totalFileSlices = partitionFileSliceList.size();
+
+    // Determine threshold based on RLI type
+    // Global RLI: higher threshold (50) since it's one big partition
+    // Partitioned RLI: lower threshold (10) since we process per partition
+    int samplingThreshold = dataWriteConfig.isRecordLevelIndexEnabled() ? 10 : 
50;
+
+    // For small number of file slices, skip sampling and count all
+    if (totalFileSlices <= samplingThreshold) {
+      LOG.info("Total file slices ({}) <= threshold ({}), counting all file 
slices without sampling",
+          totalFileSlices, samplingThreshold);
+      Pair<Long, Long> countAndSize = 
countRecordsFromFileSlices(partitionFileSliceList);
+      return countAndSize.getLeft();
+    }
+
+    int sampleSize = Math.max(1, (int) Math.ceil(totalFileSlices * 
samplingFraction));

Review Comment:
   IIUC, for partitioned RLI, at 11 file slices, the code switches from 
counting all 11 to sampling `ceil(11 * 0.1)` = 2 out of 11. That's a huge drop 
in sample size at the boundary. Is that ok?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -847,6 +887,195 @@ private int 
estimateFileGroupCount(HoodieData<HoodieRecord> records) {
     );
   }
 
+  /**
+   * Estimates total record count using size-weighted sampling:
+   * 1. Sample a fraction of file slices and count their actual records 
(lightweight - no RLI record construction)
+   * 2. Calculate average records-per-byte ratio from samples
+   * 3. Use file sizes of ALL file slices to estimate total records
+   *
+   * If total file slices are below threshold, counts all file slices directly 
without sampling.
+   * Threshold: 50 for global RLI, 10 for partitioned RLI.
+   *
+   * @param partitionFileSliceList list of partition and file slice pairs
+   * @param samplingFraction fraction of file slices to sample
+   * @return estimated total record count
+   */
+  private long estimateRecordCountBySizeWeightedSampling(List<Pair<String, 
FileSlice>> partitionFileSliceList,
+                                                          double 
samplingFraction) {
+    if (partitionFileSliceList.isEmpty()) {
+      return 0L;
+    }
+
+    int totalFileSlices = partitionFileSliceList.size();
+
+    // Determine threshold based on RLI type
+    // Global RLI: higher threshold (50) since it's one big partition
+    // Partitioned RLI: lower threshold (10) since we process per partition
+    int samplingThreshold = dataWriteConfig.isRecordLevelIndexEnabled() ? 10 : 
50;
+
+    // For small number of file slices, skip sampling and count all
+    if (totalFileSlices <= samplingThreshold) {
+      LOG.info("Total file slices ({}) <= threshold ({}), counting all file 
slices without sampling",
+          totalFileSlices, samplingThreshold);
+      Pair<Long, Long> countAndSize = 
countRecordsFromFileSlices(partitionFileSliceList);
+      return countAndSize.getLeft();
+    }
+
+    int sampleSize = Math.max(1, (int) Math.ceil(totalFileSlices * 
samplingFraction));
+
+    LOG.info("Sampling {} out of {} file slices ({:.1f}%) to estimate record 
count",
+        sampleSize, totalFileSlices, samplingFraction * 100);
+
+    // Sample file slices uniformly
+    List<Pair<String, FileSlice>> sampledFileSlices = 
sampleFileSlicesUniformly(partitionFileSliceList, sampleSize);
+
+    // Count records from sampled file slices (lightweight - just counting, no 
RLI record construction)
+    // Returns: (recordCount, actualSampledBaseFileSize) - only counts file 
slices with base files
+    Pair<Long, Long> countAndSize = 
countRecordsFromFileSlices(sampledFileSlices);
+    long sampledRecordCount = countAndSize.getLeft();
+    long actualSampledSize = countAndSize.getRight();
+
+    LOG.info("Counted {} records from {} bytes of actual sampled base files", 
sampledRecordCount, actualSampledSize);
+
+    if (actualSampledSize == 0) {
+      LOG.warn("No base files found in sampled file slices, returning 0 as 
estimate");
+      return 0L;
+    }
+
+    double recordsPerByte = (double) sampledRecordCount / actualSampledSize;
+    LOG.info("Calculated records-per-byte ratio: {:.6f}", recordsPerByte);
+
+    // Estimate total records using ALL file slice base file sizes
+    long totalBaseFileSize = partitionFileSliceList.stream()
+        .mapToLong(p -> getFileSliceSize(p.getValue()))
+        .sum();
+
+    long estimatedTotal = (long) Math.ceil(totalBaseFileSize * recordsPerByte);
+    LOG.info("Estimated total record count: {} based on {} total base file 
bytes", estimatedTotal, totalBaseFileSize);
+
+    return estimatedTotal;
+  }
+
+  /**
+   * Counts records from file slices without constructing RLI records.
+   * This is a lightweight operation used only for estimation purposes.
+   * For MOR tables, only reads from base files (not log files) for faster 
estimation.
+   *
+   * @param partitionFileSlicePairs list of partition and file slice pairs
+   * @return Pair of (total record count, total base file size actually 
sampled)
+   */
+  private <T> Pair<Long, Long> countRecordsFromFileSlices(List<Pair<String, 
FileSlice>> partitionFileSlicePairs) {
+    if (partitionFileSlicePairs.isEmpty()) {
+      return Pair.of(0L, 0L);
+    }
+
+    // Filter to only file slices with base files
+    List<Pair<String, FileSlice>> fileSlicesWithBaseFiles = 
partitionFileSlicePairs.stream()
+        .filter(p -> p.getValue().getBaseFile().isPresent())
+        .collect(Collectors.toList());
+
+    if (fileSlicesWithBaseFiles.isEmpty()) {
+      LOG.warn("No file slices with base files found for record count 
estimation");
+      return Pair.of(0L, 0L);
+    }
+
+    // Calculate total base file size for the file slices we're actually 
counting
+    long totalBaseFileSize = fileSlicesWithBaseFiles.stream()
+        .mapToLong(p -> getFileSliceSize(p.getValue()))
+        .sum();
+
+    Option<String> instantTime = 
dataMetaClient.getActiveTimeline().getCommitsTimeline()
+        .filterCompletedInstants()
+        .lastInstant()
+        .map(HoodieInstant::requestedTime);
+    if (!instantTime.isPresent()) {
+      return Pair.of(0L, 0L);
+    }
+
+    final int parallelism = Math.min(fileSlicesWithBaseFiles.size(),
+        dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());
+    ReaderContextFactory<T> readerContextFactory = 
engineContext.getReaderContextFactory(dataMetaClient);
+
+    // Parallel count operation - just iterate through base file records 
without constructing RLI records
+    // For MOR tables, we skip log files to keep estimation fast
+    long totalCount = engineContext.parallelize(fileSlicesWithBaseFiles, 
parallelism)
+        .map(partitionAndFileSlice -> {
+          final FileSlice fileSlice = partitionAndFileSlice.getValue();
+          final HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
+          HoodieReaderContext<T> readerContext = 
readerContextFactory.getContext();
+
+          try {
+            // Create a file slice with only the base file (no log files) for 
faster counting
+            FileSlice baseFileOnlySlice = new 
FileSlice(fileSlice.getPartitionPath(),
+                fileSlice.getBaseInstantTime(), fileSlice.getFileId());
+            baseFileOnlySlice.setBaseFile(baseFile);
+
+            HoodieFileGroupReader<T> fileGroupReader = 
HoodieFileGroupReader.<T>newBuilder()
+                .withReaderContext(readerContext)
+                .withHoodieTableMetaClient(dataMetaClient)
+                .withFileSlice(baseFileOnlySlice)  // Base file only
+                .withLatestCommitTime(instantTime.get())
+                .withShouldUseRecordPosition(false)
+                .withProps(dataMetaClient.getTableConfig().getProps())
+                .build();

Review Comment:
   don't we need to pass dataSchema/requestedSchema as well?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -806,6 +807,26 @@ private Pair<Integer, HoodieData<HoodieRecord>> 
initializeRecordIndexPartition(
       List<Pair<String, FileSlice>> latestMergedPartitionFileSliceList,
       int recordIndexMaxParallelism) {
     LOG.info("Initializing record index from {} file slices", 
latestMergedPartitionFileSliceList.size());
+
+    // Get min/max file group count bounds
+    Pair<Integer, Integer> bounds = getRLIFileGroupCountBounds();
+    int minFileGroupCount = bounds.getLeft();
+    int maxFileGroupCount = bounds.getRight();
+
+    int fileGroupCount;
+    // Use sampling-based estimation if min != max (dynamic sizing expected)
+    if (minFileGroupCount != maxFileGroupCount) {
+      // Estimate file group count using sampling (10% of file slices)
+      fileGroupCount = 
estimateFileGroupCountBySampling(latestMergedPartitionFileSliceList, 0.1,

Review Comment:
   Should `samplingFraction` be an internal advanced config instead of 
hard-coded 0.1?



##########
hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java:
##########
@@ -238,6 +238,14 @@ public long count() {
     return super.count();
   }
 
+  @Override
+  public long sum() {
+    return asStream()
+        .map(obj -> (Long) obj)
+        .mapToLong(Long::longValue)
+        .sum();
+  }

Review Comment:
   I'm not sure if this is the right API choice. Adding a type-unsafe `sum()` 
(casts elements to Long, throws `ClassCastException` otherwise) to the core 
`HoodieData` interface is a permanent public API expansion for a single 
internal use. The same result could be achieved with: 
   ```          
     long totalCount = engineContext.parallelize(fileSlices, parallelism)
         .map(/* count logic */)                                                
                                                                                
                                                                                
     
         .collectAsList().stream().mapToLong(Long::longValue).sum();
   ```                                                                          
                                                                                
                
   The list is small (one Long per partition), so collecting is cheap. This 
avoids expanding the interface.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -847,6 +887,195 @@ private int 
estimateFileGroupCount(HoodieData<HoodieRecord> records) {
     );
   }
 
+  /**
+   * Estimates total record count using size-weighted sampling:
+   * 1. Sample a fraction of file slices and count their actual records 
(lightweight - no RLI record construction)
+   * 2. Calculate average records-per-byte ratio from samples
+   * 3. Use file sizes of ALL file slices to estimate total records
+   *
+   * If total file slices are below threshold, counts all file slices directly 
without sampling.
+   * Threshold: 50 for global RLI, 10 for partitioned RLI.
+   *
+   * @param partitionFileSliceList list of partition and file slice pairs
+   * @param samplingFraction fraction of file slices to sample
+   * @return estimated total record count
+   */
+  private long estimateRecordCountBySizeWeightedSampling(List<Pair<String, 
FileSlice>> partitionFileSliceList,
+                                                          double 
samplingFraction) {
+    if (partitionFileSliceList.isEmpty()) {
+      return 0L;
+    }
+
+    int totalFileSlices = partitionFileSliceList.size();
+
+    // Determine threshold based on RLI type
+    // Global RLI: higher threshold (50) since it's one big partition
+    // Partitioned RLI: lower threshold (10) since we process per partition
+    int samplingThreshold = dataWriteConfig.isRecordLevelIndexEnabled() ? 10 : 
50;
+
+    // For small number of file slices, skip sampling and count all
+    if (totalFileSlices <= samplingThreshold) {
+      LOG.info("Total file slices ({}) <= threshold ({}), counting all file 
slices without sampling",
+          totalFileSlices, samplingThreshold);
+      Pair<Long, Long> countAndSize = 
countRecordsFromFileSlices(partitionFileSliceList);
+      return countAndSize.getLeft();
+    }
+
+    int sampleSize = Math.max(1, (int) Math.ceil(totalFileSlices * 
samplingFraction));
+
+    LOG.info("Sampling {} out of {} file slices ({:.1f}%) to estimate record 
count",
+        sampleSize, totalFileSlices, samplingFraction * 100);
+
+    // Sample file slices uniformly
+    List<Pair<String, FileSlice>> sampledFileSlices = 
sampleFileSlicesUniformly(partitionFileSliceList, sampleSize);
+
+    // Count records from sampled file slices (lightweight - just counting, no 
RLI record construction)
+    // Returns: (recordCount, actualSampledBaseFileSize) - only counts file 
slices with base files
+    Pair<Long, Long> countAndSize = 
countRecordsFromFileSlices(sampledFileSlices);
+    long sampledRecordCount = countAndSize.getLeft();
+    long actualSampledSize = countAndSize.getRight();
+
+    LOG.info("Counted {} records from {} bytes of actual sampled base files", 
sampledRecordCount, actualSampledSize);
+
+    if (actualSampledSize == 0) {
+      LOG.warn("No base files found in sampled file slices, returning 0 as 
estimate");
+      return 0L;
+    }
+
+    double recordsPerByte = (double) sampledRecordCount / actualSampledSize;
+    LOG.info("Calculated records-per-byte ratio: {:.6f}", recordsPerByte);
+
+    // Estimate total records using ALL file slice base file sizes
+    long totalBaseFileSize = partitionFileSliceList.stream()
+        .mapToLong(p -> getFileSliceSize(p.getValue()))
+        .sum();
+
+    long estimatedTotal = (long) Math.ceil(totalBaseFileSize * recordsPerByte);
+    LOG.info("Estimated total record count: {} based on {} total base file 
bytes", estimatedTotal, totalBaseFileSize);
+
+    return estimatedTotal;
+  }
+
+  /**
+   * Counts records from file slices without constructing RLI records.
+   * This is a lightweight operation used only for estimation purposes.
+   * For MOR tables, only reads from base files (not log files) for faster 
estimation.
+   *
+   * @param partitionFileSlicePairs list of partition and file slice pairs
+   * @return Pair of (total record count, total base file size actually 
sampled)
+   */
+  private <T> Pair<Long, Long> countRecordsFromFileSlices(List<Pair<String, 
FileSlice>> partitionFileSlicePairs) {
+    if (partitionFileSlicePairs.isEmpty()) {
+      return Pair.of(0L, 0L);
+    }
+
+    // Filter to only file slices with base files
+    List<Pair<String, FileSlice>> fileSlicesWithBaseFiles = 
partitionFileSlicePairs.stream()
+        .filter(p -> p.getValue().getBaseFile().isPresent())
+        .collect(Collectors.toList());
+
+    if (fileSlicesWithBaseFiles.isEmpty()) {
+      LOG.warn("No file slices with base files found for record count 
estimation");
+      return Pair.of(0L, 0L);
+    }
+
+    // Calculate total base file size for the file slices we're actually 
counting
+    long totalBaseFileSize = fileSlicesWithBaseFiles.stream()
+        .mapToLong(p -> getFileSliceSize(p.getValue()))
+        .sum();
+
+    Option<String> instantTime = 
dataMetaClient.getActiveTimeline().getCommitsTimeline()
+        .filterCompletedInstants()
+        .lastInstant()
+        .map(HoodieInstant::requestedTime);
+    if (!instantTime.isPresent()) {
+      return Pair.of(0L, 0L);
+    }
+
+    final int parallelism = Math.min(fileSlicesWithBaseFiles.size(),
+        dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());
+    ReaderContextFactory<T> readerContextFactory = 
engineContext.getReaderContextFactory(dataMetaClient);
+
+    // Parallel count operation - just iterate through base file records 
without constructing RLI records
+    // For MOR tables, we skip log files to keep estimation fast
+    long totalCount = engineContext.parallelize(fileSlicesWithBaseFiles, 
parallelism)
+        .map(partitionAndFileSlice -> {
+          final FileSlice fileSlice = partitionAndFileSlice.getValue();
+          final HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
+          HoodieReaderContext<T> readerContext = 
readerContextFactory.getContext();
+
+          try {
+            // Create a file slice with only the base file (no log files) for 
faster counting
+            FileSlice baseFileOnlySlice = new 
FileSlice(fileSlice.getPartitionPath(),
+                fileSlice.getBaseInstantTime(), fileSlice.getFileId());
+            baseFileOnlySlice.setBaseFile(baseFile);
+
+            HoodieFileGroupReader<T> fileGroupReader = 
HoodieFileGroupReader.<T>newBuilder()
+                .withReaderContext(readerContext)
+                .withHoodieTableMetaClient(dataMetaClient)
+                .withFileSlice(baseFileOnlySlice)  // Base file only
+                .withLatestCommitTime(instantTime.get())
+                .withShouldUseRecordPosition(false)
+                .withProps(dataMetaClient.getTableConfig().getProps())
+                .build();
+            ClosableIterator closableIterator = 
fileGroupReader.getClosableKeyIterator();
+            // Just count records without materializing them
+            long count = 0;
+            while (closableIterator.hasNext()) {
+              closableIterator.next();
+              count++;
+            }

Review Comment:
   If this is cheap enough to do for fraction of files, what's the cost of 
doing it for all files? I think we should include benchmark data showing that 
even lightweight key iteration over all files is prohibitively slow. Without 
that data, sampling introduces estimation error for unclear benefit. For 
instance, why can't we simply read row count from footer metadata?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to