danny0405 commented on code in PR #18354:
URL: https://github.com/apache/hudi/pull/18354#discussion_r3007064628
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java:
##########
@@ -847,6 +887,195 @@ private int
estimateFileGroupCount(HoodieData<HoodieRecord> records) {
);
}
+ /**
+ * Estimates total record count using size-weighted sampling:
+ * 1. Sample a fraction of file slices and count their actual records
(lightweight - no RLI record construction)
+ * 2. Calculate average records-per-byte ratio from samples
+ * 3. Use file sizes of ALL file slices to estimate total records
+ *
+ * If total file slices are below threshold, counts all file slices directly
without sampling.
+ * Threshold: 50 for global RLI, 10 for partitioned RLI.
+ *
+ * @param partitionFileSliceList list of partition and file slice pairs
+ * @param samplingFraction fraction of file slices to sample
+ * @return estimated total record count
+ */
+ private long estimateRecordCountBySizeWeightedSampling(List<Pair<String,
FileSlice>> partitionFileSliceList,
+ double
samplingFraction) {
+ if (partitionFileSliceList.isEmpty()) {
+ return 0L;
+ }
+
+ int totalFileSlices = partitionFileSliceList.size();
+
+ // Determine threshold based on RLI type
+ // Global RLI: higher threshold (50) since it's one big partition
+ // Partitioned RLI: lower threshold (10) since we process per partition
+ int samplingThreshold = dataWriteConfig.isRecordLevelIndexEnabled() ? 10 :
50;
+
+ // For small number of file slices, skip sampling and count all
+ if (totalFileSlices <= samplingThreshold) {
+ LOG.info("Total file slices ({}) <= threshold ({}), counting all file
slices without sampling",
+ totalFileSlices, samplingThreshold);
+ Pair<Long, Long> countAndSize =
countRecordsFromFileSlices(partitionFileSliceList);
+ return countAndSize.getLeft();
+ }
+
+ int sampleSize = Math.max(1, (int) Math.ceil(totalFileSlices *
samplingFraction));
+
+ LOG.info("Sampling {} out of {} file slices ({:.1f}%) to estimate record
count",
+ sampleSize, totalFileSlices, samplingFraction * 100);
+
+ // Sample file slices uniformly
+ List<Pair<String, FileSlice>> sampledFileSlices =
sampleFileSlicesUniformly(partitionFileSliceList, sampleSize);
+
+ // Count records from sampled file slices (lightweight - just counting, no
RLI record construction)
+ // Returns: (recordCount, actualSampledBaseFileSize) - only counts file
slices with base files
+ Pair<Long, Long> countAndSize =
countRecordsFromFileSlices(sampledFileSlices);
+ long sampledRecordCount = countAndSize.getLeft();
+ long actualSampledSize = countAndSize.getRight();
+
+ LOG.info("Counted {} records from {} bytes of actual sampled base files",
sampledRecordCount, actualSampledSize);
+
+ if (actualSampledSize == 0) {
+ LOG.warn("No base files found in sampled file slices, returning 0 as
estimate");
+ return 0L;
+ }
+
+ double recordsPerByte = (double) sampledRecordCount / actualSampledSize;
+ LOG.info("Calculated records-per-byte ratio: {:.6f}", recordsPerByte);
+
+ // Estimate total records using ALL file slice base file sizes
+ long totalBaseFileSize = partitionFileSliceList.stream()
+ .mapToLong(p -> getFileSliceSize(p.getValue()))
+ .sum();
+
+ long estimatedTotal = (long) Math.ceil(totalBaseFileSize * recordsPerByte);
+ LOG.info("Estimated total record count: {} based on {} total base file
bytes", estimatedTotal, totalBaseFileSize);
+
+ return estimatedTotal;
+ }
+
+ /**
+ * Counts records from file slices without constructing RLI records.
+ * This is a lightweight operation used only for estimation purposes.
+ * For MOR tables, only reads from base files (not log files) for faster
estimation.
+ *
+ * @param partitionFileSlicePairs list of partition and file slice pairs
+ * @return Pair of (total record count, total base file size actually
sampled)
+ */
+ private <T> Pair<Long, Long> countRecordsFromFileSlices(List<Pair<String,
FileSlice>> partitionFileSlicePairs) {
+ if (partitionFileSlicePairs.isEmpty()) {
+ return Pair.of(0L, 0L);
+ }
+
+ // Filter to only file slices with base files
+ List<Pair<String, FileSlice>> fileSlicesWithBaseFiles =
partitionFileSlicePairs.stream()
+ .filter(p -> p.getValue().getBaseFile().isPresent())
+ .collect(Collectors.toList());
+
+ if (fileSlicesWithBaseFiles.isEmpty()) {
+ LOG.warn("No file slices with base files found for record count
estimation");
+ return Pair.of(0L, 0L);
+ }
+
+ // Calculate total base file size for the file slices we're actually
counting
+ long totalBaseFileSize = fileSlicesWithBaseFiles.stream()
+ .mapToLong(p -> getFileSliceSize(p.getValue()))
+ .sum();
+
+ Option<String> instantTime =
dataMetaClient.getActiveTimeline().getCommitsTimeline()
+ .filterCompletedInstants()
+ .lastInstant()
+ .map(HoodieInstant::requestedTime);
+ if (!instantTime.isPresent()) {
+ return Pair.of(0L, 0L);
+ }
+
+ final int parallelism = Math.min(fileSlicesWithBaseFiles.size(),
+ dataWriteConfig.getMetadataConfig().getRecordIndexMaxParallelism());
+ ReaderContextFactory<T> readerContextFactory =
engineContext.getReaderContextFactory(dataMetaClient);
+
+ // Parallel count operation - just iterate through base file records
without constructing RLI records
+ // For MOR tables, we skip log files to keep estimation fast
+ long totalCount = engineContext.parallelize(fileSlicesWithBaseFiles,
parallelism)
+ .map(partitionAndFileSlice -> {
+ final FileSlice fileSlice = partitionAndFileSlice.getValue();
+ final HoodieBaseFile baseFile = fileSlice.getBaseFile().get();
+ HoodieReaderContext<T> readerContext =
readerContextFactory.getContext();
+
+ try {
+ // Create a file slice with only the base file (no log files) for
faster counting
+ FileSlice baseFileOnlySlice = new
FileSlice(fileSlice.getPartitionPath(),
+ fileSlice.getBaseInstantTime(), fileSlice.getFileId());
+ baseFileOnlySlice.setBaseFile(baseFile);
+
+ HoodieFileGroupReader<T> fileGroupReader =
HoodieFileGroupReader.<T>newBuilder()
+ .withReaderContext(readerContext)
+ .withHoodieTableMetaClient(dataMetaClient)
+ .withFileSlice(baseFileOnlySlice) // Base file only
+ .withLatestCommitTime(instantTime.get())
+ .withShouldUseRecordPosition(false)
+ .withProps(dataMetaClient.getTableConfig().getProps())
+ .build();
+ ClosableIterator closableIterator =
fileGroupReader.getClosableKeyIterator();
+ // Just count records without materializing them
+ long count = 0;
+ while (closableIterator.hasNext()) {
+ closableIterator.next();
+ count++;
+ }
Review Comment:
> For instance, why can't we simply read row count from footer metadata?
+1
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]