This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e853440bac04 fix(flink): fix the mor small file record size estimation
(#18991)
e853440bac04 is described below
commit e853440bac04f1ca9869a749d7f69d409d647ab3
Author: Danny Chan <[email protected]>
AuthorDate: Tue Jun 16 17:01:20 2026 +0800
fix(flink): fix the mor small file record size estimation (#18991)
---
.../partitioner/profile/DeltaWriteProfile.java | 36 ++++++-
.../sink/partitioner/profile/WriteProfile.java | 60 +++++++-----
.../hudi/sink/partitioner/TestBucketAssigner.java | 105 ++++++++++++++++++++-
.../hudi/table/ITTestDynamicBucketStreamWrite.java | 7 +-
4 files changed, 179 insertions(+), 29 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
index f73adb37d337..a79c78e6d363 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
@@ -22,12 +22,15 @@ import
org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.action.commit.SmallFile;
+import lombok.extern.slf4j.Slf4j;
+
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@@ -38,7 +41,9 @@ import java.util.stream.Collectors;
*
* <p>Note: assumes the index can always index log files for Flink write.
*/
+@Slf4j
public class DeltaWriteProfile extends WriteProfile {
+
public DeltaWriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext
context) {
super(config, context);
}
@@ -86,12 +91,34 @@ public class DeltaWriteProfile extends WriteProfile {
return smallFileLocations;
}
+ @Override
+ protected long averageBytesPerRecord() {
+ long avgSize = config.getCopyOnWriteRecordSizeEstimate();
+ HoodieTimeline commitTimeline =
metaClient.getCommitTimeline().filterCompletedInstants();
+ if (!commitTimeline.empty()) {
+ long sizeFromCommitMetadata =
calculateRecordSizeThroughCommitMetadata(commitTimeline, 1.0D);
+ if (sizeFromCommitMetadata > 0) {
+ avgSize = sizeFromCommitMetadata;
+ }
+ } else {
+ HoodieTimeline deltaCommitTimeline =
metaClient.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants();
+ if (!deltaCommitTimeline.empty()) {
+ long sizeFromCommitMetadata =
calculateRecordSizeThroughCommitMetadata(deltaCommitTimeline,
logFileToParquetCompressionRatio());
+ if (sizeFromCommitMetadata > 0) {
+ avgSize = sizeFromCommitMetadata;
+ }
+ }
+ }
+ log.info("Refresh average bytes per record => " + avgSize);
+ return avgSize;
+ }
+
protected SyncableFileSystemView getFileSystemView() {
return (SyncableFileSystemView) getTable().getSliceView();
}
private long getTotalFileSize(FileSlice fileSlice) {
- return
fileSlice.getTotalFileSizeAsParquetFormat(config.getLogFileToParquetCompressionRatio());
+ return
fileSlice.getTotalFileSizeAsParquetFormat(logFileToParquetCompressionRatio());
}
private boolean isSmallFile(FileSlice fileSlice) {
@@ -99,4 +126,11 @@ public class DeltaWriteProfile extends WriteProfile {
return totalSize < config.getParquetMaxFileSize();
}
+ private double logFileToParquetCompressionRatio() {
+ if (config.getLogDataBlockFormat().isPresent()
+ && config.getLogDataBlockFormat().get() ==
HoodieLogBlock.HoodieLogBlockType.PARQUET_DATA_BLOCK) {
+ return 1D;
+ }
+ return config.getLogFileToParquetCompressionRatio();
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
index cbcc71cedb6e..c3819bf08116 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
@@ -117,7 +117,6 @@ public class WriteProfile {
this.context = context;
this.basePath = new Path(config.getBasePath());
this.smallFilesMap = new HashMap<>();
- this.recordsPerBucket = config.getCopyOnWriteInsertSplitSize();
this.metaClient = StreamerUtil.createMetaClient(
config.getBasePath(),
context.getStorageConf().unwrapAs(Configuration.class));
this.metadataCache = new HashMap<>();
@@ -134,35 +133,46 @@ public class WriteProfile {
* Obtains the average record size based on records written during previous
commits. Used for estimating how many
* records pack into one file.
*/
- private long averageBytesPerRecord() {
+ protected long averageBytesPerRecord() {
long avgSize = config.getCopyOnWriteRecordSizeEstimate();
- long fileSizeThreshold = (long) (config.getRecordSizeEstimationThreshold()
* config.getParquetSmallFileLimit());
- HoodieTimeline commitTimeline =
metaClient.getCommitsTimeline().filterCompletedInstants();
+ HoodieTimeline commitTimeline =
metaClient.getCommitTimeline().filterCompletedInstants();
if (!commitTimeline.empty()) {
- // Go over the reverse ordered commits to get a more recent estimate of
average record size.
- Iterator<HoodieInstant> instants =
commitTimeline.getReverseOrderedInstants().iterator();
- while (instants.hasNext()) {
- HoodieInstant instant = instants.next();
- final HoodieCommitMetadata commitMetadata =
- this.metadataCache.computeIfAbsent(
- instant.requestedTime(),
- k ->
WriteProfiles.getCommitMetadataSafely(config.getTableName(), basePath, instant,
commitTimeline)
- .orElse(null));
- if (commitMetadata == null) {
- continue;
- }
- long totalBytesWritten = commitMetadata.fetchTotalBytesWritten();
- long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten();
- if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten > 0) {
- avgSize = (long) Math.ceil((1.0 * totalBytesWritten) /
totalRecordsWritten);
- break;
- }
+ long sizeFromCommitMetadata =
calculateRecordSizeThroughCommitMetadata(commitTimeline, 1.0D);
+ if (sizeFromCommitMetadata > 0) {
+ avgSize = sizeFromCommitMetadata;
}
}
log.info("Refresh average bytes per record => " + avgSize);
return avgSize;
}
+ protected long calculateRecordSizeThroughCommitMetadata(HoodieTimeline
commitTimeline, double fileSizeCalibrationRatio) {
+ long fileSizeThreshold = recordSizeEstimationFileSizeThreshold();
+ // Go over the reverse ordered commits to get a more recent estimate of
average record size.
+ Iterator<HoodieInstant> instants =
commitTimeline.getReverseOrderedInstants().iterator();
+ while (instants.hasNext()) {
+ HoodieInstant instant = instants.next();
+ final HoodieCommitMetadata commitMetadata =
+ this.metadataCache.computeIfAbsent(
+ instant.requestedTime(),
+ k ->
WriteProfiles.getCommitMetadataSafely(config.getTableName(), basePath, instant,
commitTimeline)
+ .orElse(null));
+ if (commitMetadata == null) {
+ continue;
+ }
+ long totalBytesWritten = commitMetadata.fetchTotalBytesWritten();
+ long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten();
+ if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten > 0) {
+ return (long) Math.ceil((fileSizeCalibrationRatio * totalBytesWritten)
/ totalRecordsWritten);
+ }
+ }
+ return -1L;
+ }
+
+ private long recordSizeEstimationFileSizeThreshold() {
+ return (long) (0.1D * config.getParquetSmallFileLimit());
+ }
+
/**
* Returns a list of small files in the given partition path.
*
@@ -228,10 +238,8 @@ public class WriteProfile {
private void recordProfile() {
this.avgSize = averageBytesPerRecord();
- if (config.shouldAllowMultiWriteOnSameInstant()) {
- this.recordsPerBucket = config.getParquetMaxFileSize() / avgSize;
- log.info("Refresh insert records per bucket => " + recordsPerBucket);
- }
+ this.recordsPerBucket = config.getParquetMaxFileSize() / avgSize;
+ log.info("Refresh insert records per bucket => " + recordsPerBucket);
}
/**
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java
index 4f682084050b..8281ebbd86e4 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java
@@ -19,11 +19,16 @@
package org.apache.hudi.sink.partitioner;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
+import org.apache.hudi.common.config.HoodieStorageConfig;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
+import org.apache.hudi.sink.partitioner.profile.DeltaWriteProfile;
import org.apache.hudi.sink.partitioner.profile.WriteProfile;
import org.apache.hudi.table.action.commit.BucketInfo;
import org.apache.hudi.table.action.commit.BucketType;
@@ -151,7 +156,8 @@ public class TestBucketAssigner {
@Test
public void testInsertOverBucketAssigned() {
-
conf.setString(HoodieCompactionConfig.COPY_ON_WRITE_INSERT_SPLIT_SIZE.key(),
"2");
+ conf.set(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE, 1);
+
conf.setString(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(),
String.valueOf(512 * 1024));
writeConfig = FlinkWriteClients.getHoodieClientConfig(conf);
MockBucketAssigner mockBucketAssigner = new MockBucketAssigner(context,
writeConfig);
@@ -402,6 +408,103 @@ public class TestBucketAssigner {
writeProfile.getMetadataCache().size(), is(3));
}
+ @Test
+ public void testWriteProfileRecordsPerBucketUsesProfiledRecordSize() {
+ conf.set(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE, 1);
+
conf.setString(HoodieCompactionConfig.COPY_ON_WRITE_INSERT_SPLIT_SIZE.key(),
"2");
+
conf.setString(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(),
"1024");
+ writeConfig = FlinkWriteClients.getHoodieClientConfig(conf);
+
+ WriteProfile writeProfile = new WriteProfile(writeConfig, context);
+
+ assertThat("Average record size should use the configured estimate for an
empty table",
+ writeProfile.getAvgSize(), is(1024L));
+ assertThat("Records per bucket should be derived from the max parquet file
size",
+ writeProfile.getRecordsPerBucket(), is(1024L));
+ }
+
+ @Test
+ public void
testWriteProfileRecordsPerBucketUsesProfiledRecordSizeWithSmallEstimationThreshold()
throws Exception {
+ conf.set(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE, 1);
+
conf.setString(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(),
String.valueOf(1024 * 1024));
+ conf.setString(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "1");
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+ writeConfig = FlinkWriteClients.getHoodieClientConfig(conf);
+ WriteProfile writeProfile = new WriteProfile(writeConfig, context);
+ String latestInstant = getLastCompleteInstant(writeProfile);
+ HoodieCommitMetadata commitMetadata =
writeProfile.getMetadataCache().get(latestInstant);
+ assertNotNull(commitMetadata);
+ long expectedAvgSize = (long) Math.ceil(
+ 1.0 * commitMetadata.fetchTotalBytesWritten() /
commitMetadata.fetchTotalRecordsWritten());
+
+ assertThat("Average record size should use commit metadata when it is
large enough relative to small file limit",
+ writeProfile.getAvgSize(), is(expectedAvgSize));
+ assertThat("Records per bucket should use the profiled record size",
+ writeProfile.getRecordsPerBucket(),
is(writeConfig.getParquetMaxFileSize() / expectedAvgSize));
+ }
+
+ @Test
+ public void testDeltaWriteProfileRecordsPerBucketUsesCompressionRatio()
throws Exception {
+ File morPath = new File(tempFile, "mor");
+ Configuration morConf =
TestConfigurations.getDefaultConf(morPath.getAbsolutePath());
+ morConf.set(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
+ morConf.set(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE, 1);
+
morConf.setString(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(),
"1024");
+ morConf.setString(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(),
"1");
+
morConf.setString(HoodieStorageConfig.LOGFILE_TO_PARQUET_COMPRESSION_RATIO_FRACTION.key(),
"0.5");
+ StreamerUtil.initTableIfNotExists(morConf);
+ TestData.writeData(TestData.DATA_SET_INSERT, morConf);
+
+ HoodieWriteConfig morWriteConfig =
FlinkWriteClients.getHoodieClientConfig(morConf);
+ HoodieFlinkEngineContext morContext = new HoodieFlinkEngineContext(
+
HadoopFSUtils.getStorageConf(HadoopConfigurations.getHadoopConf(morConf)),
+ new FlinkTaskContextSupplier(null));
+
+ DeltaWriteProfile writeProfile = new DeltaWriteProfile(morWriteConfig,
morContext);
+ String latestInstant = getLastCompleteInstant(writeProfile);
+ HoodieCommitMetadata commitMetadata =
writeProfile.getMetadataCache().get(latestInstant);
+ assertNotNull(commitMetadata);
+ long expectedAvgSize = (long) Math.ceil(
+ 0.5 * commitMetadata.fetchTotalBytesWritten() /
commitMetadata.fetchTotalRecordsWritten());
+
+ assertThat("Average record size from commit metadata should be corrected
for MOR log-to-parquet compression",
+ writeProfile.getAvgSize(), is(expectedAvgSize));
+ assertThat("Records per bucket should use the corrected MOR average record
size",
+ writeProfile.getRecordsPerBucket(),
is(morWriteConfig.getParquetMaxFileSize() / expectedAvgSize));
+ }
+
+ @Test
+ public void
testDeltaWriteProfileRecordsPerBucketSkipsCompressionRatioForParquetLogBlocks()
throws Exception {
+ File morPath = new File(tempFile, "mor_parquet_logs");
+ Configuration morConf =
TestConfigurations.getDefaultConf(morPath.getAbsolutePath());
+ morConf.set(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
+ morConf.set(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE, 1);
+
morConf.setString(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(),
"1024");
+ morConf.setString(HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(),
"1");
+
morConf.setString(HoodieStorageConfig.LOGFILE_TO_PARQUET_COMPRESSION_RATIO_FRACTION.key(),
"0.5");
+ morConf.setString(HoodieStorageConfig.LOGFILE_DATA_BLOCK_FORMAT.key(),
"parquet");
+ StreamerUtil.initTableIfNotExists(morConf);
+ TestData.writeData(TestData.DATA_SET_INSERT, morConf);
+
+ HoodieWriteConfig morWriteConfig =
FlinkWriteClients.getHoodieClientConfig(morConf);
+ HoodieFlinkEngineContext morContext = new HoodieFlinkEngineContext(
+
HadoopFSUtils.getStorageConf(HadoopConfigurations.getHadoopConf(morConf)),
+ new FlinkTaskContextSupplier(null));
+
+ DeltaWriteProfile writeProfile = new DeltaWriteProfile(morWriteConfig,
morContext);
+ String latestInstant = getLastCompleteInstant(writeProfile);
+ HoodieCommitMetadata commitMetadata =
writeProfile.getMetadataCache().get(latestInstant);
+ assertNotNull(commitMetadata);
+ long expectedAvgSize = (long) Math.ceil(
+ 1.0 * commitMetadata.fetchTotalBytesWritten() /
commitMetadata.fetchTotalRecordsWritten());
+
+ assertThat("Average record size from parquet log blocks should not be
corrected again",
+ writeProfile.getAvgSize(), is(expectedAvgSize));
+ assertThat("Records per bucket should use the uncorrected parquet log
block average record size",
+ writeProfile.getRecordsPerBucket(),
is(morWriteConfig.getParquetMaxFileSize() / expectedAvgSize));
+ }
+
private static String getLastCompleteInstant(WriteProfile profile) {
return StreamerUtil.getLastCompletedInstant(profile.getMetaClient());
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestDynamicBucketStreamWrite.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestDynamicBucketStreamWrite.java
index 83b5b7f73cdf..6fb0dceb6fa9 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestDynamicBucketStreamWrite.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestDynamicBucketStreamWrite.java
@@ -166,8 +166,13 @@ public class ITTestDynamicBucketStreamWrite {
@ParameterizedTest
@EnumSource(value = HoodieTableType.class)
void testBucketScalesUpWithContinuousWrites(HoodieTableType tableType) {
+ Map<String, String> smallBucketOptions = Map.of(
+ HoodieCompactionConfig.COPY_ON_WRITE_INSERT_SPLIT_SIZE.key(), "1",
+ FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE.key(), "1",
+ HoodieCompactionConfig.PARQUET_SMALL_FILE_LIMIT.key(), "1",
+ HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(),
String.valueOf(1024 * 1024));
streamTableEnv.executeSql(getTableDDL(
- "t1", tableType,
Collections.singletonMap(HoodieCompactionConfig.COPY_ON_WRITE_INSERT_SPLIT_SIZE.key(),
"1"), true));
+ "t1", tableType, smallBucketOptions, true));
execInsertSql(streamTableEnv, "insert into t1 values\n"
+ "('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par_scale'),\n"