danny0405 commented on code in PR #18354:
URL: https://github.com/apache/hudi/pull/18354#discussion_r3007064628


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -847,6 +887,195 @@ private int 
estimateFileGroupCount(HoodieData<HoodieRecord> records) {
     );
   }
 
+  /**
+   * Estimates total record count using size-weighted sampling:
+   * 1. Sample a fraction of file slices and count their actual records 
(lightweight - no RLI record construction)
+   * 2. Calculate average records-per-byte ratio from samples
+   * 3. Use file sizes of ALL file slices to estimate total records
+   *
+   * If total file slices are below threshold, counts all file slices directly 
without sampling.
+   * Threshold: 50 for global RLI, 10 for partitioned RLI.
+   *
+   * @param partitionFileSliceList list of partition and file slice pairs
+   * @param samplingFraction fraction of file slices to sample
+   * @return estimated total record count
+   */
+  private long estimateRecordCountBySizeWeightedSampling(List<Pair<String, 
FileSlice>> partitionFileSliceList,
+                                                          double 
samplingFraction) {
+    if (partitionFileSliceList.isEmpty()) {
+      return 0L;
+    }
+
+    int totalFileSlices = partitionFileSliceList.size();
+
+    // Determine threshold based on RLI type
+    // Global RLI: higher threshold (50) since it's one big partition
+    // Partitioned RLI: lower threshold (10) since we process per partition
+    int samplingThreshold = dataWriteConfig.isRecordLevelIndexEnabled() ? 10 : 
50;
+
+    // For small number of file slices, skip sampling and count all
+    if (totalFileSlices <= samplingThreshold) {
+      LOG.info("Total file slices ({}) <= threshold ({}), counting all file 
slices without sampling",
+          totalFileSlices, samplingThreshold);
+      Pair<Long, Long> countAndSize = 
countRecordsFromFileSlices(partitionFileSliceList);
+      return countAndSize.getLeft();
+    }
+
+    int sampleSize = Math.max(1, (int) Math.ceil(totalFileSlices * 
samplingFraction));
+
+    LOG.info("Sampling {} out of {} file slices ({:.1f}%) to estimate record 
count",
+        sampleSize, totalFileSlices, samplingFraction * 100);
+
+    // Sample file slices uniformly
+    List<Pair<String, FileSlice>> sampledFileSlices = 
sampleFileSlicesUniformly(partitionFileSliceList, sampleSize);
+
+    // Count records from sampled file slices (lightweight - just counting, no 
RLI record construction)
+    // Returns: (recordCount, actualSampledBaseFileSize) - only counts file 
slices with base files
+    Pair<Long, Long> countAndSize = 
countRecordsFromFileSlices(sampledFileSlices);
+    long sampledRecordCount = countAndSize.getLeft();
+    long actualSampledSize = countAndSize.getRight();
+
+    LOG.info("Counted {} records from {} bytes of actual sampled base files", 
sampledRecordCount, actualSampledSize);
+
+    if (actualSampledSize == 0) {
+      LOG.warn("No base files found in sampled file slices, returning 0 as 
estimate");
+      return 0L;
+    }
+
+    double recordsPerByte = (double) sampledRecordCount / actualSampledSize;
+    LOG.info("Calculated records-per-byte ratio: {:.6f}", recordsPerByte);
+
+    // Estimate total records using ALL file slice base file sizes
+    long totalBaseFileSize = partitionFileSliceList.stream()
+        .mapToLong(p -> getFileSliceSize(p.getValue()))
+        .sum();
+
+    long estimatedTotal = (long) Math.ceil(totalBaseFileSize * recordsPerByte);
+    LOG.info("Estimated total record count: {} based on {} total base file 
bytes", estimatedTotal, totalBaseFileSize);
+
+    return estimatedTotal;
+  }
+
+  /**
+   * Counts records from file slices without constructing RLI records.
+   * This is a lightweight operation used only for estimation purposes.
+   * For MOR tables, only reads from base files (not log files) for faster 
estimation.
+   *
+   * @param partitionFileSlicePairs list of partition and file slice pairs
+   * @return Pair of (total record count, total base file size actually 
sampled)
+   */
+  private <T> Pair<Long, Long> countRecordsFromFileSlices(List<Pair<String, 
FileSlice>> partitionFileSlicePairs) {
+    if (partitionFileSlicePairs.isEmpty()) {
+      return Pair.of(0L, 0L);
+    }
+
+    // Filter to only file slices with base files
+    List<Pair<String, FileSlice>> fileSlicesWithBaseFiles = 
partitionFileSlicePairs.stream()
+        .filter(p -> p.getValue().getBaseFile().isPresent())
+        .collect(Collectors.toList());
+
+    if (fileSlicesWithBaseFiles.isEmpty()) {
+      LOG.warn("No file slices with base files found for record count 
estimation");
+      return Pair.of(0L, 0L);
+    }
+
+    // Calculate total base file size for the file slices we're actually 
counting
+    long totalBaseFileSize = fileSlicesWithBaseFiles.stream()
+        .mapToLong(p -> getFileSliceSize(p.getValue()))
+        .sum();
+
+    Option<String> instantTime = 
dataMetaClient.getActiveTimeline().getCommitsTimeline()
+        .filterCompletedInstants()
+        .lastInstant()
+        .map(HoodieInstant::requestedTime);
+    if (!instantTime.isPresent()) {
+      return Pair.of(0L, 0L);
+    }
+
+    final int parallelism = Math.min(fileSlicesWithBaseFiles.size(),
+        dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());
+    ReaderContextFactory<T> readerContextFactory = 
engineContext.getReaderContextFactory(dataMetaClient);
+
+    // Parallel count operation - just iterate through base file records 
without constructing RLI records
+    // For MOR tables, we skip log files to keep estimation fast
+    long totalCount = engineContext.parallelize(fileSlicesWithBaseFiles, 
parallelism)
+        .map(partitionAndFileSlice -> {
+          final FileSlice fileSlice = partitionAndFileSlice.getValue();
+          final HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
+          HoodieReaderContext<T> readerContext = 
readerContextFactory.getContext();
+
+          try {
+            // Create a file slice with only the base file (no log files) for 
faster counting
+            FileSlice baseFileOnlySlice = new 
FileSlice(fileSlice.getPartitionPath(),
+                fileSlice.getBaseInstantTime(), fileSlice.getFileId());
+            baseFileOnlySlice.setBaseFile(baseFile);
+
+            HoodieFileGroupReader<T> fileGroupReader = 
HoodieFileGroupReader.<T>newBuilder()
+                .withReaderContext(readerContext)
+                .withHoodieTableMetaClient(dataMetaClient)
+                .withFileSlice(baseFileOnlySlice)  // Base file only
+                .withLatestCommitTime(instantTime.get())
+                .withShouldUseRecordPosition(false)
+                .withProps(dataMetaClient.getTableConfig().getProps())
+                .build();
+            ClosableIterator closableIterator = 
fileGroupReader.getClosableKeyIterator();
+            // Just count records without materializing them
+            long count = 0;
+            while (closableIterator.hasNext()) {
+              closableIterator.next();
+              count++;
+            }

Review Comment:
   > For instance, why can't we simply read row count from footer metadata?
   
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to