This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 1ff99ca [HUDI-1786] Add option for merge max memory (#2805)
1ff99ca is described below
commit 1ff99ca7d7bbdcb24ebae96c434a35000eded1fc
Author: Danny Chan <[email protected]>
AuthorDate: Mon Apr 12 17:03:58 2021 +0800
[HUDI-1786] Add option for merge max memory (#2805)
---
.../src/main/java/org/apache/hudi/configuration/FlinkOptions.java | 6 ++++++
hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java | 2 +-
2 files changed, 7 insertions(+), 1 deletion(-)
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index b120714..c47ea95 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -273,6 +273,12 @@ public class FlinkOptions {
.defaultValue(128)
.withDescription("Max log block size in MB for log file, default 128MB");
+ public static final ConfigOption<Integer> WRITE_MERGE_MAX_MEMORY =
ConfigOptions
+ .key("write.merge.max_memory")
+ .intType()
+ .defaultValue(100) // default 100 MB
+ .withDescription("Max memory in MB for merge, default 100MB");
+
// ------------------------------------------------------------------------
// Compaction Options
// ------------------------------------------------------------------------
diff --git a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 3cc5d56..a7295fb 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -207,7 +207,7 @@ public class StreamerUtil {
.withMemoryConfig(
HoodieMemoryConfig.newBuilder()
.withMaxMemoryMaxSize(
- conf.getInteger(FlinkOptions.COMPACTION_MAX_MEMORY) *
1024 * 1024L,
+ conf.getInteger(FlinkOptions.WRITE_MERGE_MAX_MEMORY) *
1024 * 1024L,
conf.getInteger(FlinkOptions.COMPACTION_MAX_MEMORY) *
1024 * 1024L
).build())
.forTable(conf.getString(FlinkOptions.TABLE_NAME))