This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6e767fed329 [HUDI-9473] Set memory config properly for FileGroup 
reader in Flink reader (#13372)
6e767fed329 is described below

commit 6e767fed329adf21eed2d08d581f9bf76dba1b20
Author: Shuo Cheng <[email protected]>
AuthorDate: Fri May 30 08:36:36 2025 +0800

    [HUDI-9473] Set memory config properly for FileGroup reader in Flink reader 
(#13372)
---
 .../apache/hudi/io/v2/FlinkFileGroupReaderBasedMergeHandle.java    | 2 +-
 .../src/main/java/org/apache/hudi/util/FlinkClientUtil.java        | 7 +++++--
 .../src/main/java/org/apache/hudi/table/format/FormatUtils.java    | 2 +-
 .../hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java   | 7 +++++++
 4 files changed, 14 insertions(+), 4 deletions(-)

diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/v2/FlinkFileGroupReaderBasedMergeHandle.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/v2/FlinkFileGroupReaderBasedMergeHandle.java
index 520b5a6de76..a39bdcca1f0 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/v2/FlinkFileGroupReaderBasedMergeHandle.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/v2/FlinkFileGroupReaderBasedMergeHandle.java
@@ -80,7 +80,7 @@ public class FlinkFileGroupReaderBasedMergeHandle<T, I, K, O> 
extends BaseFileGr
     if (!StringUtils.isNullOrEmpty(config.getInternalSchema())) {
       internalSchemaOption = SerDeHelper.fromJson(config.getInternalSchema());
     }
-    TypedProperties props = 
FlinkClientUtil.getMergedTableAndWriteProps(hoodieTable.getMetaClient().getTableConfig(),
 config);
+    TypedProperties props = 
FlinkClientUtil.getReadProps(hoodieTable.getMetaClient().getTableConfig(), 
config);
     // Initializes file group reader
     try (HoodieFileGroupReader<T> fileGroupReader = 
HoodieFileGroupReader.<T>newBuilder()
         
.withReaderContext(readerContext).withHoodieTableMetaClient(hoodieTable.getMetaClient())
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/FlinkClientUtil.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/FlinkClientUtil.java
index 3670fbdcbc1..a429a42b16b 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/FlinkClientUtil.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/FlinkClientUtil.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.util;
 
+import org.apache.hudi.common.config.HoodieMemoryConfig;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -94,17 +95,19 @@ public class FlinkClientUtil {
   }
 
   /**
-   * Get merged {@link TypedProperties} from {@link HoodieTableConfig} and 
{@link HoodieWriteConfig}.
+   * Get merged {@link TypedProperties} from {@link HoodieTableConfig} and 
{@link HoodieWriteConfig}, which is used by FileGroup reader.
    *
    * @param tableConfig The hoodie table configuration
    * @param writeConfig the hoodie write configuration
    *
    * @return Merged {@link TypedProperties} from {@link HoodieTableConfig} and 
{@link HoodieWriteConfig}
    */
-  public static TypedProperties getMergedTableAndWriteProps(HoodieTableConfig 
tableConfig, HoodieWriteConfig writeConfig) {
+  public static TypedProperties getReadProps(HoodieTableConfig tableConfig, 
HoodieWriteConfig writeConfig) {
     TypedProperties props = new TypedProperties();
     props.putAll(tableConfig.getProps());
     writeConfig.getProps().forEach(props::putIfAbsent);
+    // For compatibility, log scanner uses MAX_MEMORY_FOR_COMPACTION for 
merging and FileGroup reader uses MAX_MEMORY_FOR_MERGE for merging.
+    props.put(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(), 
writeConfig.getString(HoodieMemoryConfig.MAX_MEMORY_FOR_COMPACTION));
     return props;
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
index 9e98c895c7f..0a0b0449b2b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
@@ -126,7 +126,7 @@ public class FormatUtils {
             predicates,
             metaClient.getTableConfig(),
             instantRangeOption);
-    final TypedProperties typedProps = 
FlinkClientUtil.getMergedTableAndWriteProps(metaClient.getTableConfig(), 
writeConfig);
+    final TypedProperties typedProps = 
FlinkClientUtil.getReadProps(metaClient.getTableConfig(), writeConfig);
     typedProps.put(HoodieReaderConfig.MERGE_TYPE.key(), mergeType);
 
     return HoodieFileGroupReader.<RowData>newBuilder()
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 726273266fb..d344134e3aa 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -20,6 +20,7 @@ package org.apache.hudi.utils;
 
 import org.apache.hudi.client.common.HoodieFlinkEngineContext;
 import org.apache.hudi.client.model.PartialUpdateFlinkRecordMerger;
+import org.apache.hudi.common.config.HoodieMemoryConfig;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieRecordMerger;
@@ -905,6 +906,12 @@ public class TestData {
     // 1. init flink table
     HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
         .fromFile(hoodiePropertiesFile)
+        .withMemoryConfig(
+            HoodieMemoryConfig.newBuilder()
+                .withMaxMemoryMaxSize(
+                    FlinkOptions.WRITE_MERGE_MAX_MEMORY.defaultValue() * 1024 
* 1024L,
+                    FlinkOptions.COMPACTION_MAX_MEMORY.defaultValue() * 1024 * 
1024L)
+                .build())
         .withPath(basePath)
         .build();
     // deal with partial update merger

Reply via email to