This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 2370615e9bb [HUDI-6216] Get table type and payload class from table 
config in flink table factory when not provided (#8720)
2370615e9bb is described below

commit 2370615e9bb0b4d07d3946c52dd29d68456e6cb6
Author: Bingeng Huang <[email protected]>
AuthorDate: Tue May 16 17:12:23 2023 +0800

    [HUDI-6216] Get table type and payload class from table config in flink 
table factory when not provided (#8720)
    
    Co-authored-by: hbg <[email protected]>
---
 .../main/java/org/apache/hudi/table/HoodieTableFactory.java   |  8 ++++++++
 .../java/org/apache/hudi/table/TestHoodieTableFactory.java    | 11 +++++++++++
 2 files changed, 19 insertions(+)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 20fd83ce147..58413926e51 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -126,6 +126,14 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
               && !conf.contains(FlinkOptions.HIVE_STYLE_PARTITIONING)) {
             conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, 
tableConfig.getBoolean(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE));
           }
+          if (tableConfig.contains(HoodieTableConfig.TYPE)
+              && !conf.contains(FlinkOptions.TABLE_TYPE)) {
+            conf.setString(FlinkOptions.TABLE_TYPE, 
tableConfig.getString(HoodieTableConfig.TYPE));
+          }
+          if (tableConfig.contains(HoodieTableConfig.PAYLOAD_CLASS_NAME)
+              && !conf.contains(FlinkOptions.PAYLOAD_CLASS_NAME)) {
+            conf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, 
tableConfig.getString(HoodieTableConfig.PAYLOAD_CLASS_NAME));
+          }
         });
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
index ea9ab217564..03d39aeb99a 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
@@ -221,6 +221,9 @@ public class TestHoodieTableFactory {
     tableConf.setString(FlinkOptions.TABLE_NAME, "t2");
     tableConf.setString(FlinkOptions.RECORD_KEY_FIELD, "f0,f1");
     tableConf.setString(FlinkOptions.PRECOMBINE_FIELD, "f2");
+    tableConf.setString(FlinkOptions.TABLE_TYPE, 
FlinkOptions.TABLE_TYPE_MERGE_ON_READ);
+    tableConf.setString(FlinkOptions.PAYLOAD_CLASS_NAME, "my_payload");
+    tableConf.setString(FlinkOptions.PARTITION_PATH_FIELD, "partition");
 
     StreamerUtil.initTableIfNotExists(tableConf);
 
@@ -246,6 +249,14 @@ public class TestHoodieTableFactory {
         source1.getConf().get(FlinkOptions.PRECOMBINE_FIELD), is("f2"));
     assertThat("pre-combine key not provided, fallback to table config",
         sink1.getConf().get(FlinkOptions.PRECOMBINE_FIELD), is("f2"));
+    assertThat("table type not provided, fallback to table config",
+        source1.getConf().get(FlinkOptions.TABLE_TYPE), 
is(FlinkOptions.TABLE_TYPE_MERGE_ON_READ));
+    assertThat("table type not provided, fallback to table config",
+        sink1.getConf().get(FlinkOptions.TABLE_TYPE), 
is(FlinkOptions.TABLE_TYPE_MERGE_ON_READ));
+    assertThat("payload class not provided, fallback to table config",
+        source1.getConf().get(FlinkOptions.PAYLOAD_CLASS_NAME), 
is("my_payload"));
+    assertThat("payload class not provided, fallback to table config",
+        sink1.getConf().get(FlinkOptions.PAYLOAD_CLASS_NAME), 
is("my_payload"));
 
     // write config always has higher priority
     // set up a different primary key and pre_combine key with table config 
options

Reply via email to