This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e853440bac04 fix(flink): fix the mor small file record size estimation 
(#18991)
e853440bac04 is described below

commit e853440bac04f1ca9869a749d7f69d409d647ab3
Author: Danny Chan <[email protected]>
AuthorDate: Tue Jun 16 17:01:20 2026 +0800

    fix(flink): fix the mor small file record size estimation (#18991)
---
 .../partitioner/profile/DeltaWriteProfile.java     |  36 ++++++-
 .../sink/partitioner/profile/WriteProfile.java     |  60 +++++++-----
 .../hudi/sink/partitioner/TestBucketAssigner.java  | 105 ++++++++++++++++++++-
 .../hudi/table/ITTestDynamicBucketStreamWrite.java |   7 +-
 4 files changed, 179 insertions(+), 29 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
index f73adb37d337..a79c78e6d363 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
@@ -22,12 +22,15 @@ import 
org.apache.hudi.client.common.HoodieFlinkEngineContext;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieBaseFile;
 import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.view.SyncableFileSystemView;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.table.action.commit.SmallFile;
 
+import lombok.extern.slf4j.Slf4j;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
@@ -38,7 +41,9 @@ import java.util.stream.Collectors;
  *
  * <p>Note: assumes the index can always index log files for Flink write.
  */
+@Slf4j
 public class DeltaWriteProfile extends WriteProfile {
+
   public DeltaWriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext 
context) {
     super(config, context);
   }
@@ -86,12 +91,34 @@ public class DeltaWriteProfile extends WriteProfile {
     return smallFileLocations;
   }
 
+  @Override
+  protected long averageBytesPerRecord() {
+    long avgSize = config.getCopyOnWriteRecordSizeEstimate();
+    HoodieTimeline commitTimeline = 
metaClient.getCommitTimeline().filterCompletedInstants();
+    if (!commitTimeline.empty()) {
+      long sizeFromCommitMetadata = 
calculateRecordSizeThroughCommitMetadata(commitTimeline, 1.0D);
+      if (sizeFromCommitMetadata > 0) {
+        avgSize = sizeFromCommitMetadata;
+      }
+    } else {
+      HoodieTimeline deltaCommitTimeline = 
metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants();
+      if (!deltaCommitTimeline.empty()) {
+        long sizeFromCommitMetadata = 
calculateRecordSizeThroughCommitMetadata(deltaCommitTimeline, 
logFileToParquetCompressionRatio());
+        if (sizeFromCommitMetadata > 0) {
+          avgSize = sizeFromCommitMetadata;
+        }
+      }
+    }
+    log.info("Refresh average bytes per record => " + avgSize);
+    return avgSize;
+  }
+
   protected SyncableFileSystemView getFileSystemView() {
     return (SyncableFileSystemView) getTable().getSliceView();
   }
 
   private long getTotalFileSize(FileSlice fileSlice) {
-    return 
fileSlice.getTotalFileSizeAsParquetFormat(config.getLogFileToParquetCompressionRatio());
+    return 
fileSlice.getTotalFileSizeAsParquetFormat(logFileToParquetCompressionRatio());
   }
 
   private boolean isSmallFile(FileSlice fileSlice) {
@@ -99,4 +126,11 @@ public class DeltaWriteProfile extends WriteProfile {
     return totalSize < config.getParquetMaxFileSize();
   }
 
+  private double logFileToParquetCompressionRatio() {
+    if (config.getLogDataBlockFormat().isPresent()
+        && config.getLogDataBlockFormat().get() == 
HoodieLogBlock.HoodieLogBlockType.PARQUET_DATA_BLOCK) {
+      return 1D;
+    }
+    return config.getLogFileToParquetCompressionRatio();
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
index cbcc71cedb6e..c3819bf08116 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
@@ -117,7 +117,6 @@ public class WriteProfile {
     this.context = context;
     this.basePath = new Path(config.getBasePath());
     this.smallFilesMap = new HashMap<>();
-    this.recordsPerBucket = config.getCopyOnWriteInsertSplitSize();
     this.metaClient = StreamerUtil.createMetaClient(
         config.getBasePath(), 
context.getStorageConf().unwrapAs(Configuration.class));
     this.metadataCache = new HashMap<>();
@@ -134,35 +133,46 @@ public class WriteProfile {
    * Obtains the average record size based on records written during previous 
commits. Used for estimating how many
    * records pack into one file.
    */
-  private long averageBytesPerRecord() {
+  protected long averageBytesPerRecord() {
     long avgSize = config.getCopyOnWriteRecordSizeEstimate();
-    long fileSizeThreshold = (long) (config.getRecordSizeEstimationThreshold() 
* config.getParquetSmallFileLimit());
-    HoodieTimeline commitTimeline = 
metaClient.getCommitsTimeline().filterCompletedInstants();
+    HoodieTimeline commitTimeline = 
metaClient.getCommitTimeline().filterCompletedInstants();
     if (!commitTimeline.empty()) {
-      // Go over the reverse ordered commits to get a more recent estimate of 
average record size.
-      Iterator<HoodieInstant> instants = 
commitTimeline.getReverseOrderedInstants().iterator();
-      while (instants.hasNext()) {
-        HoodieInstant instant = instants.next();
-        final HoodieCommitMetadata commitMetadata =
-            this.metadataCache.computeIfAbsent(
-                instant.requestedTime(),
-                k -> 
WriteProfiles.getCommitMetadataSafely(config.getTableName(), basePath, instant, 
commitTimeline)
-                    .orElse(null));
-        if (commitMetadata == null) {
-          continue;
-        }
-        long totalBytesWritten = commitMetadata.fetchTotalBytesWritten();
-        long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten();
-        if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten > 0) {
-          avgSize = (long) Math.ceil((1.0 * totalBytesWritten) / 
totalRecordsWritten);
-          break;
-        }
+      long sizeFromCommitMetadata = 
calculateRecordSizeThroughCommitMetadata(commitTimeline, 1.0D);
+      if (sizeFromCommitMetadata > 0) {
+        avgSize = sizeFromCommitMetadata;
       }
     }
     log.info("Refresh average bytes per record => " + avgSize);
     return avgSize;
   }
 
+  protected long calculateRecordSizeThroughCommitMetadata(HoodieTimeline 
commitTimeline, double fileSizeCalibrationRatio) {
+    long fileSizeThreshold = recordSizeEstimationFileSizeThreshold();
+    // Go over the reverse ordered commits to get a more recent estimate of 
average record size.
+    Iterator<HoodieInstant> instants = 
commitTimeline.getReverseOrderedInstants().iterator();
+    while (instants.hasNext()) {
+      HoodieInstant instant = instants.next();
+      final HoodieCommitMetadata commitMetadata =
+          this.metadataCache.computeIfAbsent(
+              instant.requestedTime(),
+              k -> 
WriteProfiles.getCommitMetadataSafely(config.getTableName(), basePath, instant, 
commitTimeline)
+                  .orElse(null));
+      if (commitMetadata == null) {
+        continue;
+      }
+      long totalBytesWritten = commitMetadata.fetchTotalBytesWritten();
+      long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten();
+      if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten > 0) {
+        return (long) Math.ceil((fileSizeCalibrationRatio * totalBytesWritten) 
/ totalRecordsWritten);
+      }
+    }
+    return -1L;
+  }
+
+  private long recordSizeEstimationFileSizeThreshold() {
+    return (long) (0.1D * config.getParquetSmallFileLimit());
+  }
+
   /**
    * Returns a list of small files in the given partition path.
    *
@@ -228,10 +238,8 @@ public class WriteProfile {
 
   private void recordProfile() {
     this.avgSize = averageBytesPerRecord();
-    if (config.shouldAllowMultiWriteOnSameInstant()) {
-      this.recordsPerBucket = config.getParquetMaxFileSize() / avgSize;
-      log.info("Refresh insert records per bucket => " + recordsPerBucket);
-    }
+    this.recordsPerBucket = config.getParquetMaxFileSize() / avgSize;
+    log.info("Refresh insert records per bucket => " + recordsPerBucket);
   }
 
   /**
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java
index 4f682084050b..8281ebbd86e4 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java
@@ -19,11 +19,16 @@
 package org.apache.hudi.sink.partitioner;
 
 import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.HadoopConfigurations;
 import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.sink.partitioner.profile.DeltaWriteProfile;
 import org.apache.hudi.sink.partitioner.profile.WriteProfile;
 import org.apache.hudi.table.action.commit.BucketInfo;
 import org.apache.hudi.table.action.commit.BucketType;
@@ -151,7 +156,8 @@ public class TestBucketAssigner {
 
   @Test
   public void testInsertOverBucketAssigned() {
-    
conf.setString(HoodieCompactionConfig.COPY_ON_WRITE_INSERT_SPLIT_SIZE.key(), 
"2");
+    conf.set(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE, 1);
+    
conf.setString(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(), 
String.valueOf(512 * 1024));
     writeConfig = FlinkWriteClients.getHoodieClientConfig(conf);
 
     MockBucketAssigner mockBucketAssigner = new MockBucketAssigner(context, 
writeConfig);
@@ -402,6 +408,103 @@ public class TestBucketAssigner {
         writeProfile.getMetadataCache().size(), is(3));
   }
 
+  @Test
+  public void testWriteProfileRecordsPerBucketUsesProfiledRecordSize() {
+    conf.set(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE, 1);
+    
conf.setString(HoodieCompactionConfig.COPY_ON_WRITE_INSERT_SPLIT_SIZE.key(), 
"2");
+    
conf.setString(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(), 
"1024");
+    writeConfig = FlinkWriteClients.getHoodieClientConfig(conf);
+
+    WriteProfile writeProfile = new WriteProfile(writeConfig, context);
+
+    assertThat("Average record size should use the configured estimate for an 
empty table",
+        writeProfile.getAvgSize(), is(1024L));
+    assertThat("Records per bucket should be derived from the max parquet file 
size",
+        writeProfile.getRecordsPerBucket(), is(1024L));
+  }
+
+  @Test
+  public void 
testWriteProfileRecordsPerBucketUsesProfiledRecordSizeWithSmallEstimationThreshold()
 throws Exception {
+    conf.set(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE, 1);
+    
conf.setString(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(), 
String.valueOf(1024 * 1024));
+    conf.setString(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "1");
+    TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+    writeConfig = FlinkWriteClients.getHoodieClientConfig(conf);
+    WriteProfile writeProfile = new WriteProfile(writeConfig, context);
+    String latestInstant = getLastCompleteInstant(writeProfile);
+    HoodieCommitMetadata commitMetadata = 
writeProfile.getMetadataCache().get(latestInstant);
+    assertNotNull(commitMetadata);
+    long expectedAvgSize = (long) Math.ceil(
+        1.0 * commitMetadata.fetchTotalBytesWritten() / 
commitMetadata.fetchTotalRecordsWritten());
+
+    assertThat("Average record size should use commit metadata when it is 
large enough relative to small file limit",
+        writeProfile.getAvgSize(), is(expectedAvgSize));
+    assertThat("Records per bucket should use the profiled record size",
+        writeProfile.getRecordsPerBucket(), 
is(writeConfig.getParquetMaxFileSize() / expectedAvgSize));
+  }
+
+  @Test
+  public void testDeltaWriteProfileRecordsPerBucketUsesCompressionRatio() 
throws Exception {
+    File morPath = new File(tempFile, "mor");
+    Configuration morConf = 
TestConfigurations.getDefaultConf(morPath.getAbsolutePath());
+    morConf.set(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
+    morConf.set(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE, 1);
+    
morConf.setString(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(),
 "1024");
+    morConf.setString(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), 
"1");
+    
morConf.setString(HoodieStorageConfig.LOGFILE_TO_PARQUET_COMPRESSION_RATIO_FRACTION.key(),
 "0.5");
+    StreamerUtil.initTableIfNotExists(morConf);
+    TestData.writeData(TestData.DATA_SET_INSERT, morConf);
+
+    HoodieWriteConfig morWriteConfig = 
FlinkWriteClients.getHoodieClientConfig(morConf);
+    HoodieFlinkEngineContext morContext = new HoodieFlinkEngineContext(
+        
HadoopFSUtils.getStorageConf(HadoopConfigurations.getHadoopConf(morConf)),
+        new FlinkTaskContextSupplier(null));
+
+    DeltaWriteProfile writeProfile = new DeltaWriteProfile(morWriteConfig, 
morContext);
+    String latestInstant = getLastCompleteInstant(writeProfile);
+    HoodieCommitMetadata commitMetadata = 
writeProfile.getMetadataCache().get(latestInstant);
+    assertNotNull(commitMetadata);
+    long expectedAvgSize = (long) Math.ceil(
+        0.5 * commitMetadata.fetchTotalBytesWritten() / 
commitMetadata.fetchTotalRecordsWritten());
+
+    assertThat("Average record size from commit metadata should be corrected 
for MOR log-to-parquet compression",
+        writeProfile.getAvgSize(), is(expectedAvgSize));
+    assertThat("Records per bucket should use the corrected MOR average record 
size",
+        writeProfile.getRecordsPerBucket(), 
is(morWriteConfig.getParquetMaxFileSize() / expectedAvgSize));
+  }
+
+  @Test
+  public void 
testDeltaWriteProfileRecordsPerBucketSkipsCompressionRatioForParquetLogBlocks() 
throws Exception {
+    File morPath = new File(tempFile, "mor_parquet_logs");
+    Configuration morConf = 
TestConfigurations.getDefaultConf(morPath.getAbsolutePath());
+    morConf.set(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
+    morConf.set(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE, 1);
+    
morConf.setString(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(),
 "1024");
+    morConf.setString(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), 
"1");
+    
morConf.setString(HoodieStorageConfig.LOGFILE_TO_PARQUET_COMPRESSION_RATIO_FRACTION.key(),
 "0.5");
+    morConf.setString(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(), 
"parquet");
+    StreamerUtil.initTableIfNotExists(morConf);
+    TestData.writeData(TestData.DATA_SET_INSERT, morConf);
+
+    HoodieWriteConfig morWriteConfig = 
FlinkWriteClients.getHoodieClientConfig(morConf);
+    HoodieFlinkEngineContext morContext = new HoodieFlinkEngineContext(
+        
HadoopFSUtils.getStorageConf(HadoopConfigurations.getHadoopConf(morConf)),
+        new FlinkTaskContextSupplier(null));
+
+    DeltaWriteProfile writeProfile = new DeltaWriteProfile(morWriteConfig, 
morContext);
+    String latestInstant = getLastCompleteInstant(writeProfile);
+    HoodieCommitMetadata commitMetadata = 
writeProfile.getMetadataCache().get(latestInstant);
+    assertNotNull(commitMetadata);
+    long expectedAvgSize = (long) Math.ceil(
+        1.0 * commitMetadata.fetchTotalBytesWritten() / 
commitMetadata.fetchTotalRecordsWritten());
+
+    assertThat("Average record size from parquet log blocks should not be 
corrected again",
+        writeProfile.getAvgSize(), is(expectedAvgSize));
+    assertThat("Records per bucket should use the uncorrected parquet log 
block average record size",
+        writeProfile.getRecordsPerBucket(), 
is(morWriteConfig.getParquetMaxFileSize() / expectedAvgSize));
+  }
+
   private static String getLastCompleteInstant(WriteProfile profile) {
     return StreamerUtil.getLastCompletedInstant(profile.getMetaClient());
   }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestDynamicBucketStreamWrite.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestDynamicBucketStreamWrite.java
index 83b5b7f73cdf..6fb0dceb6fa9 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestDynamicBucketStreamWrite.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestDynamicBucketStreamWrite.java
@@ -166,8 +166,13 @@ public class ITTestDynamicBucketStreamWrite {
   @ParameterizedTest
   @EnumSource(value = HoodieTableType.class)
   void testBucketScalesUpWithContinuousWrites(HoodieTableType tableType) {
+    Map<String, String> smallBucketOptions = Map.of(
+        HoodieCompactionConfig.COPY_ON_WRITE_INSERT_SPLIT_SIZE.key(), "1",
+        FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE.key(), "1",
+        HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "1",
+        HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(), 
String.valueOf(1024 * 1024));
     streamTableEnv.executeSql(getTableDDL(
-        "t1", tableType, 
Collections.singletonMap(HoodieCompactionConfig.COPY_ON_WRITE_INSERT_SPLIT_SIZE.key(),
 "1"), true));
+        "t1", tableType, smallBucketOptions, true));
 
     execInsertSql(streamTableEnv, "insert into t1 values\n"
         + "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par_scale'),\n"

Reply via email to