This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 5579715ef7d9 fix(flink): reuse the preceeding avg size if there is no
eligible estimation (#19022)
5579715ef7d9 is described below
commit 5579715ef7d926fc68c9b0dc1918ff835c798e89
Author: Danny Chan <[email protected]>
AuthorDate: Wed Jun 17 11:25:41 2026 +0800
fix(flink): reuse the preceeding avg size if there is no eligible
estimation (#19022)
* fix(flink): reuse the preceeding avg size if there is no eligible
estimation
---
.../partitioner/profile/DeltaWriteProfile.java | 6 +-
.../sink/partitioner/profile/WriteProfile.java | 8 +-
.../hudi/sink/partitioner/TestBucketAssigner.java | 85 ++++++++++++++++++++++
3 files changed, 90 insertions(+), 9 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
index a79c78e6d363..2cdee3453a46 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/DeltaWriteProfile.java
@@ -29,8 +29,6 @@ import
org.apache.hudi.common.table.view.SyncableFileSystemView;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.table.action.commit.SmallFile;
-import lombok.extern.slf4j.Slf4j;
-
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
@@ -41,7 +39,6 @@ import java.util.stream.Collectors;
*
* <p>Note: assumes the index can always index log files for Flink write.
*/
-@Slf4j
public class DeltaWriteProfile extends WriteProfile {
public DeltaWriteProfile(HoodieWriteConfig config, HoodieFlinkEngineContext
context) {
@@ -93,7 +90,7 @@ public class DeltaWriteProfile extends WriteProfile {
@Override
protected long averageBytesPerRecord() {
- long avgSize = config.getCopyOnWriteRecordSizeEstimate();
+ long avgSize = this.avgSize > 0 ? this.avgSize :
config.getCopyOnWriteRecordSizeEstimate();
HoodieTimeline commitTimeline =
metaClient.getCommitTimeline().filterCompletedInstants();
if (!commitTimeline.empty()) {
long sizeFromCommitMetadata =
calculateRecordSizeThroughCommitMetadata(commitTimeline, 1.0D);
@@ -109,7 +106,6 @@ public class DeltaWriteProfile extends WriteProfile {
}
}
}
- log.info("Refresh average bytes per record => " + avgSize);
return avgSize;
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
index c3819bf08116..304d8f5d7452 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/profile/WriteProfile.java
@@ -78,7 +78,7 @@ public class WriteProfile {
* The average record size.
*/
@Getter
- private long avgSize = -1L;
+ protected long avgSize = -1L;
/**
* Total records to write for each bucket based on
@@ -134,7 +134,7 @@ public class WriteProfile {
* records pack into one file.
*/
protected long averageBytesPerRecord() {
- long avgSize = config.getCopyOnWriteRecordSizeEstimate();
+ long avgSize = this.avgSize > 0 ? this.avgSize :
config.getCopyOnWriteRecordSizeEstimate();
HoodieTimeline commitTimeline =
metaClient.getCommitTimeline().filterCompletedInstants();
if (!commitTimeline.empty()) {
long sizeFromCommitMetadata =
calculateRecordSizeThroughCommitMetadata(commitTimeline, 1.0D);
@@ -142,7 +142,6 @@ public class WriteProfile {
avgSize = sizeFromCommitMetadata;
}
}
- log.info("Refresh average bytes per record => " + avgSize);
return avgSize;
}
@@ -238,8 +237,9 @@ public class WriteProfile {
private void recordProfile() {
this.avgSize = averageBytesPerRecord();
+ log.info("Refresh average bytes per record => {}", avgSize);
this.recordsPerBucket = config.getParquetMaxFileSize() / avgSize;
- log.info("Refresh insert records per bucket => " + recordsPerBucket);
+ log.info("Refresh insert records per bucket => {}", recordsPerBucket);
}
/**
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java
index 8281ebbd86e4..3b5c265f7c91 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/partitioner/TestBucketAssigner.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.config.HoodieStorageConfig;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.configuration.FlinkOptions;
@@ -46,11 +47,13 @@ import org.junit.jupiter.api.io.TempDir;
import java.io.File;
import java.io.IOException;
+import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -444,6 +447,26 @@ public class TestBucketAssigner {
writeProfile.getRecordsPerBucket(),
is(writeConfig.getParquetMaxFileSize() / expectedAvgSize));
}
+ @Test
+ public void
testWriteProfileReusesPreviousAvgSizeWhenNoEligibleCommitOnReload() throws
Exception {
+ conf.set(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE, 1);
+
conf.setString(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(),
"1024");
+ TestData.writeData(TestData.DATA_SET_INSERT, conf);
+
+ writeConfig = FlinkWriteClients.getHoodieClientConfig(conf);
+ setScriptedRecordSizes(512L, -1L);
+ WriteProfile writeProfile = new
ScriptedRecordSizeWriteProfile(writeConfig, context);
+ assertThat("Average record size should use the profiled commit metadata",
+ writeProfile.getAvgSize(), is(512L));
+
+ writeProfile.reload(1);
+
+ assertThat("Average record size should reuse the previous estimate when no
eligible commit metadata is found",
+ writeProfile.getAvgSize(), is(512L));
+ assertThat("Records per bucket should continue to use the previous
estimate",
+ writeProfile.getRecordsPerBucket(),
is(writeConfig.getParquetMaxFileSize() / 512L));
+ }
+
@Test
public void testDeltaWriteProfileRecordsPerBucketUsesCompressionRatio()
throws Exception {
File morPath = new File(tempFile, "mor");
@@ -505,6 +528,34 @@ public class TestBucketAssigner {
writeProfile.getRecordsPerBucket(),
is(morWriteConfig.getParquetMaxFileSize() / expectedAvgSize));
}
+ @Test
+ public void
testDeltaWriteProfileReusesPreviousAvgSizeWhenNoEligibleDeltaCommitOnReload()
throws Exception {
+ File morPath = new File(tempFile, "mor_reuse_previous_avg");
+ Configuration morConf =
TestConfigurations.getDefaultConf(morPath.getAbsolutePath());
+ morConf.set(FlinkOptions.TABLE_TYPE, HoodieTableType.MERGE_ON_READ.name());
+ morConf.set(FlinkOptions.WRITE_PARQUET_MAX_FILE_SIZE, 1);
+
morConf.setString(HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(),
"1024");
+ StreamerUtil.initTableIfNotExists(morConf);
+ TestData.writeData(TestData.DATA_SET_INSERT, morConf);
+
+ HoodieWriteConfig morWriteConfig =
FlinkWriteClients.getHoodieClientConfig(morConf);
+ HoodieFlinkEngineContext morContext = new HoodieFlinkEngineContext(
+
HadoopFSUtils.getStorageConf(HadoopConfigurations.getHadoopConf(morConf)),
+ new FlinkTaskContextSupplier(null));
+
+ setScriptedRecordSizes(256L, -1L);
+ DeltaWriteProfile writeProfile = new
ScriptedRecordSizeDeltaWriteProfile(morWriteConfig, morContext);
+ assertThat("Average record size should use the profiled delta commit
metadata",
+ writeProfile.getAvgSize(), is(256L));
+
+ writeProfile.reload(1);
+
+ assertThat("Average record size should reuse the previous estimate when no
eligible delta commit metadata is found",
+ writeProfile.getAvgSize(), is(256L));
+ assertThat("Records per bucket should continue to use the previous
estimate",
+ writeProfile.getRecordsPerBucket(),
is(morWriteConfig.getParquetMaxFileSize() / 256L));
+ }
+
private static String getLastCompleteInstant(WriteProfile profile) {
return StreamerUtil.getLastCompletedInstant(profile.getMetaClient());
}
@@ -526,6 +577,40 @@ public class TestBucketAssigner {
assertThat(bucketInfo.getBucketType(), is(bucketType));
}
+ private static Queue<Long> scriptedRecordSizes = new ArrayDeque<>();
+
+ private static void setScriptedRecordSizes(Long... recordSizes) {
+ scriptedRecordSizes = new ArrayDeque<>(Arrays.asList(recordSizes));
+ }
+
+ /**
+ * WriteProfile with scripted record size estimates.
+ */
+ static class ScriptedRecordSizeWriteProfile extends WriteProfile {
+ ScriptedRecordSizeWriteProfile(HoodieWriteConfig config,
HoodieFlinkEngineContext context) {
+ super(config, context);
+ }
+
+ @Override
+ protected long calculateRecordSizeThroughCommitMetadata(HoodieTimeline
commitTimeline, double fileSizeCalibrationRatio) {
+ return scriptedRecordSizes.remove();
+ }
+ }
+
+ /**
+ * DeltaWriteProfile with scripted record size estimates.
+ */
+ static class ScriptedRecordSizeDeltaWriteProfile extends DeltaWriteProfile {
+ ScriptedRecordSizeDeltaWriteProfile(HoodieWriteConfig config,
HoodieFlinkEngineContext context) {
+ super(config, context);
+ }
+
+ @Override
+ protected long calculateRecordSizeThroughCommitMetadata(HoodieTimeline
commitTimeline, double fileSizeCalibrationRatio) {
+ return scriptedRecordSizes.remove();
+ }
+ }
+
/**
* Mock BucketAssigner that can specify small files explicitly.
*/