This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2370615e9bb [HUDI-6216] Get table type and payload class from table
config in flink table factory when not provided (#8720)
2370615e9bb is described below
commit 2370615e9bb0b4d07d3946c52dd29d68456e6cb6
Author: Bingeng Huang <[email protected]>
AuthorDate: Tue May 16 17:12:23 2023 +0800
[HUDI-6216] Get table type and payload class from table config in flink
table factory when not provided (#8720)
Co-authored-by: hbg <[email protected]>
---
.../main/java/org/apache/hudi/table/HoodieTableFactory.java | 8 ++++++++
.../java/org/apache/hudi/table/TestHoodieTableFactory.java | 11 +++++++++++
2 files changed, 19 insertions(+)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 20fd83ce147..58413926e51 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -126,6 +126,14 @@ public class HoodieTableFactory implements
DynamicTableSourceFactory, DynamicTab
&& !conf.contains(FlinkOptions.HIVE_STYLE_PARTITIONING)) {
conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING,
tableConfig.getBoolean(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE));
}
+ if (tableConfig.contains(HoodieTableConfig.TYPE)
+ && !conf.contains(FlinkOptions.TABLE_TYPE)) {
+ conf.setString(FlinkOptions.TABLE_TYPE,
tableConfig.getString(HoodieTableConfig.TYPE));
+ }
+ if (tableConfig.contains(HoodieTableConfig.PAYLOAD_CLASS_NAME)
+ && !conf.contains(FlinkOptions.PAYLOAD_CLASS_NAME)) {
+ conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME,
tableConfig.getString(HoodieTableConfig.PAYLOAD_CLASS_NAME));
+ }
});
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
index ea9ab217564..03d39aeb99a 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
@@ -221,6 +221,9 @@ public class TestHoodieTableFactory {
tableConf.setString(FlinkOptions.TABLE_NAME, "t2");
tableConf.setString(FlinkOptions.RECORD_KEY_FIELD, "f0,f1");
tableConf.setString(FlinkOptions.PRECOMBINE_FIELD, "f2");
+ tableConf.setString(FlinkOptions.TABLE_TYPE,
FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
+ tableConf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, "my_payload");
+ tableConf.setString(FlinkOptions.PARTITION_PATH_FIELD, "partition");
StreamerUtil.initTableIfNotExists(tableConf);
@@ -246,6 +249,14 @@ public class TestHoodieTableFactory {
source1.getConf().get(FlinkOptions.PRECOMBINE_FIELD), is("f2"));
assertThat("pre-combine key not provided, fallback to table config",
sink1.getConf().get(FlinkOptions.PRECOMBINE_FIELD), is("f2"));
+ assertThat("table type not provided, fallback to table config",
+ source1.getConf().get(FlinkOptions.TABLE_TYPE),
is(FlinkOptions.TABLE_TYPE_MERGE_ON_READ));
+ assertThat("table type not provided, fallback to table config",
+ sink1.getConf().get(FlinkOptions.TABLE_TYPE),
is(FlinkOptions.TABLE_TYPE_MERGE_ON_READ));
+ assertThat("payload class not provided, fallback to table config",
+ source1.getConf().get(FlinkOptions.PAYLOAD_CLASS_NAME),
is("my_payload"));
+ assertThat("payload class not provided, fallback to table config",
+ sink1.getConf().get(FlinkOptions.PAYLOAD_CLASS_NAME),
is("my_payload"));
// write config always has higher priority
// set up a different primary key and pre_combine key with table config
options