nsivabalan commented on code in PR #14039:
URL: https://github.com/apache/hudi/pull/14039#discussion_r2399918025


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/estimator/AverageRecordSizeEstimator.java:
##########
@@ -66,68 +65,44 @@ public AverageRecordSizeEstimator(HoodieWriteConfig 
writeConfig) {
   @Override
   public long averageBytesPerRecord(HoodieTimeline commitTimeline, 
CommitMetadataSerDe commitMetadataSerDe) {
     int maxCommits = hoodieWriteConfig.getRecordSizeEstimatorMaxCommits();
-    final AverageRecordSizeStats averageRecordSizeStats = new 
AverageRecordSizeStats(hoodieWriteConfig);
+    final long commitSizeThreshold = (long) 
(hoodieWriteConfig.getRecordSizeEstimationThreshold() * 
hoodieWriteConfig.getParquetSmallFileLimit());

Review Comment:
   isn't this file slice threshold or single data file threshold?
   
   looks like it was a bug earlier. and we should fix it now. 
    
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/estimator/AverageRecordSizeEstimator.java:
##########
@@ -66,68 +65,44 @@ public AverageRecordSizeEstimator(HoodieWriteConfig 
writeConfig) {
   @Override
   public long averageBytesPerRecord(HoodieTimeline commitTimeline, 
CommitMetadataSerDe commitMetadataSerDe) {
     int maxCommits = hoodieWriteConfig.getRecordSizeEstimatorMaxCommits();
-    final AverageRecordSizeStats averageRecordSizeStats = new 
AverageRecordSizeStats(hoodieWriteConfig);
+    final long commitSizeThreshold = (long) 
(hoodieWriteConfig.getRecordSizeEstimationThreshold() * 
hoodieWriteConfig.getParquetSmallFileLimit());
+    final long metadataSizeEstimate = 
hoodieWriteConfig.getRecordSizeEstimatorAverageMetadataSize();
     try {
       if (!commitTimeline.empty()) {
-        // Go over the reverse ordered commits to get a more recent estimate 
of average record size.
-        Stream<HoodieInstant> filteredInstants = 
commitTimeline.filterCompletedInstants()
+        Iterator<HoodieInstant> instants = 
commitTimeline.filterCompletedInstants()
             .getReverseOrderedInstants()
             .filter(s -> RECORD_SIZE_ESTIMATE_ACTIONS.contains(s.getAction()))
-            .limit(maxCommits);
-        filteredInstants
-            .forEach(instant -> {
-              HoodieCommitMetadata commitMetadata;
-              try {
-                commitMetadata = commitTimeline.readCommitMetadata(instant);
-                if (instant.getAction().equals(DELTA_COMMIT_ACTION)) {
-                  // let's consider only base files in case of delta commits
-                  commitMetadata.getWriteStats().stream().parallel()
-                      .filter(hoodieWriteStat -> FSUtils.isBaseFile(new 
StoragePath(hoodieWriteStat.getPath())))
-                      .forEach(hoodieWriteStat -> 
averageRecordSizeStats.updateStats(hoodieWriteStat.getTotalWriteBytes(), 
hoodieWriteStat.getNumWrites()));
-                } else {
-                  
averageRecordSizeStats.updateStats(commitMetadata.fetchTotalBytesWritten(), 
commitMetadata.fetchTotalRecordsWritten());
-                }
-              } catch (IOException ignore) {
-                LOG.info("Failed to parse commit metadata", ignore);
-              }
-            });
+            .limit(maxCommits).iterator();
+        while (instants.hasNext()) {
+          HoodieInstant instant = instants.next();
+          try {
+            HoodieCommitMetadata commitMetadata = 
commitTimeline.readCommitMetadata(instant);
+            final HoodieAtomicLongAccumulator totalBytesWritten = 
HoodieAtomicLongAccumulator.create();

Review Comment:
   not sure why do we need an accumulator here. 
   we are processing all these in driver from what I can gauge. 
   



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/estimator/AverageRecordSizeEstimator.java:
##########
@@ -66,68 +65,44 @@ public AverageRecordSizeEstimator(HoodieWriteConfig 
writeConfig) {
   @Override
   public long averageBytesPerRecord(HoodieTimeline commitTimeline, 
CommitMetadataSerDe commitMetadataSerDe) {
     int maxCommits = hoodieWriteConfig.getRecordSizeEstimatorMaxCommits();
-    final AverageRecordSizeStats averageRecordSizeStats = new 
AverageRecordSizeStats(hoodieWriteConfig);
+    final long commitSizeThreshold = (long) 
(hoodieWriteConfig.getRecordSizeEstimationThreshold() * 
hoodieWriteConfig.getParquetSmallFileLimit());
+    final long metadataSizeEstimate = 
hoodieWriteConfig.getRecordSizeEstimatorAverageMetadataSize();
     try {
       if (!commitTimeline.empty()) {
-        // Go over the reverse ordered commits to get a more recent estimate 
of average record size.
-        Stream<HoodieInstant> filteredInstants = 
commitTimeline.filterCompletedInstants()
+        Iterator<HoodieInstant> instants = 
commitTimeline.filterCompletedInstants()
             .getReverseOrderedInstants()
             .filter(s -> RECORD_SIZE_ESTIMATE_ACTIONS.contains(s.getAction()))
-            .limit(maxCommits);
-        filteredInstants
-            .forEach(instant -> {
-              HoodieCommitMetadata commitMetadata;
-              try {
-                commitMetadata = commitTimeline.readCommitMetadata(instant);
-                if (instant.getAction().equals(DELTA_COMMIT_ACTION)) {
-                  // let's consider only base files in case of delta commits
-                  commitMetadata.getWriteStats().stream().parallel()
-                      .filter(hoodieWriteStat -> FSUtils.isBaseFile(new 
StoragePath(hoodieWriteStat.getPath())))
-                      .forEach(hoodieWriteStat -> 
averageRecordSizeStats.updateStats(hoodieWriteStat.getTotalWriteBytes(), 
hoodieWriteStat.getNumWrites()));
-                } else {
-                  
averageRecordSizeStats.updateStats(commitMetadata.fetchTotalBytesWritten(), 
commitMetadata.fetchTotalRecordsWritten());
-                }
-              } catch (IOException ignore) {
-                LOG.info("Failed to parse commit metadata", ignore);
-              }
-            });
+            .limit(maxCommits).iterator();
+        while (instants.hasNext()) {
+          HoodieInstant instant = instants.next();
+          try {
+            HoodieCommitMetadata commitMetadata = 
commitTimeline.readCommitMetadata(instant);
+            final HoodieAtomicLongAccumulator totalBytesWritten = 
HoodieAtomicLongAccumulator.create();
+            final HoodieAtomicLongAccumulator totalRecordsWritten = 
HoodieAtomicLongAccumulator.create();
+            if (instant.getAction().equals(DELTA_COMMIT_ACTION)) {
+              // Only use base files for estimate
+              commitMetadata.getWriteStats().stream()
+                  .filter(hoodieWriteStat -> FSUtils.isBaseFile(new 
StoragePath(hoodieWriteStat.getPath())))
+                  .forEach(hoodieWriteStat -> {
+                    totalBytesWritten.add(hoodieWriteStat.getTotalWriteBytes() 
- metadataSizeEstimate);
+                    totalRecordsWritten.add(hoodieWriteStat.getNumWrites());
+                  });
+            } else {
+              totalBytesWritten.add(commitMetadata.fetchTotalBytesWritten() - 
(commitMetadata.fetchTotalFiles() * metadataSizeEstimate));

Review Comment:
   if we go w/ per file size threshold, 
   then here also, we need to loop for every writeStat



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to