This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6e767fed329 [HUDI-9473] Set memory config properly for FileGroup
reader in Flink reader (#13372)
6e767fed329 is described below
commit 6e767fed329adf21eed2d08d581f9bf76dba1b20
Author: Shuo Cheng <[email protected]>
AuthorDate: Fri May 30 08:36:36 2025 +0800
[HUDI-9473] Set memory config properly for FileGroup reader in Flink reader
(#13372)
---
.../apache/hudi/io/v2/FlinkFileGroupReaderBasedMergeHandle.java | 2 +-
.../src/main/java/org/apache/hudi/util/FlinkClientUtil.java | 7 +++++--
.../src/main/java/org/apache/hudi/table/format/FormatUtils.java | 2 +-
.../hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java | 7 +++++++
4 files changed, 14 insertions(+), 4 deletions(-)
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/v2/FlinkFileGroupReaderBasedMergeHandle.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/v2/FlinkFileGroupReaderBasedMergeHandle.java
index 520b5a6de76..a39bdcca1f0 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/v2/FlinkFileGroupReaderBasedMergeHandle.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/v2/FlinkFileGroupReaderBasedMergeHandle.java
@@ -80,7 +80,7 @@ public class FlinkFileGroupReaderBasedMergeHandle<T, I, K, O>
extends BaseFileGr
if (!StringUtils.isNullOrEmpty(config.getInternalSchema())) {
internalSchemaOption = SerDeHelper.fromJson(config.getInternalSchema());
}
- TypedProperties props =
FlinkClientUtil.getMergedTableAndWriteProps(hoodieTable.getMetaClient().getTableConfig(),
config);
+ TypedProperties props =
FlinkClientUtil.getReadProps(hoodieTable.getMetaClient().getTableConfig(),
config);
// Initializes file group reader
try (HoodieFileGroupReader<T> fileGroupReader =
HoodieFileGroupReader.<T>newBuilder()
.withReaderContext(readerContext).withHoodieTableMetaClient(hoodieTable.getMetaClient())
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/FlinkClientUtil.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/FlinkClientUtil.java
index 3670fbdcbc1..a429a42b16b 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/FlinkClientUtil.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/FlinkClientUtil.java
@@ -18,6 +18,7 @@
package org.apache.hudi.util;
+import org.apache.hudi.common.config.HoodieMemoryConfig;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -94,17 +95,19 @@ public class FlinkClientUtil {
}
/**
- * Get merged {@link TypedProperties} from {@link HoodieTableConfig} and
{@link HoodieWriteConfig}.
+ * Get merged {@link TypedProperties} from {@link HoodieTableConfig} and
{@link HoodieWriteConfig}, which is used by FileGroup reader.
*
* @param tableConfig The hoodie table configuration
* @param writeConfig the hoodie write configuration
*
* @return Merged {@link TypedProperties} from {@link HoodieTableConfig} and
{@link HoodieWriteConfig}
*/
- public static TypedProperties getMergedTableAndWriteProps(HoodieTableConfig
tableConfig, HoodieWriteConfig writeConfig) {
+ public static TypedProperties getReadProps(HoodieTableConfig tableConfig,
HoodieWriteConfig writeConfig) {
TypedProperties props = new TypedProperties();
props.putAll(tableConfig.getProps());
writeConfig.getProps().forEach(props::putIfAbsent);
+ // For compatibility, log scanner uses MAX_MEMORY_FOR_COMPACTION for
merging and FileGroup reader uses MAX_MEMORY_FOR_MERGE for merging.
+ props.put(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(),
writeConfig.getString(HoodieMemoryConfig.MAX_MEMORY_FOR_COMPACTION));
return props;
}
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
index 9e98c895c7f..0a0b0449b2b 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/format/FormatUtils.java
@@ -126,7 +126,7 @@ public class FormatUtils {
predicates,
metaClient.getTableConfig(),
instantRangeOption);
- final TypedProperties typedProps =
FlinkClientUtil.getMergedTableAndWriteProps(metaClient.getTableConfig(),
writeConfig);
+ final TypedProperties typedProps =
FlinkClientUtil.getReadProps(metaClient.getTableConfig(), writeConfig);
typedProps.put(HoodieReaderConfig.MERGE_TYPE.key(), mergeType);
return HoodieFileGroupReader.<RowData>newBuilder()
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
index 726273266fb..d344134e3aa 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestData.java
@@ -20,6 +20,7 @@ package org.apache.hudi.utils;
import org.apache.hudi.client.common.HoodieFlinkEngineContext;
import org.apache.hudi.client.model.PartialUpdateFlinkRecordMerger;
+import org.apache.hudi.common.config.HoodieMemoryConfig;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieRecordMerger;
@@ -905,6 +906,12 @@ public class TestData {
// 1. init flink table
HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
.fromFile(hoodiePropertiesFile)
+ .withMemoryConfig(
+ HoodieMemoryConfig.newBuilder()
+ .withMaxMemoryMaxSize(
+ FlinkOptions.WRITE_MERGE_MAX_MEMORY.defaultValue() * 1024
* 1024L,
+ FlinkOptions.COMPACTION_MAX_MEMORY.defaultValue() * 1024 *
1024L)
+ .build())
.withPath(basePath)
.build();
// deal with partial update merger