This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 92f68529819 [HUDI-7159]Check the table type between hoodie.properies 
and table options (#10209)
92f68529819 is described below

commit 92f6852981952a552cb7a5bf4623f8c3eb1cca73
Author: hehuiyuan <[email protected]>
AuthorDate: Sat Dec 9 11:11:20 2023 +0800

    [HUDI-7159]Check the table type between hoodie.properies and table options 
(#10209)
---
 .../org/apache/hudi/table/HoodieTableFactory.java     |  9 +++++++++
 .../org/apache/hudi/table/TestHoodieTableFactory.java | 19 +++++++++++++++----
 2 files changed, 24 insertions(+), 4 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 747617604cb..6751083a5cd 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -125,6 +125,15 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
               && !conf.contains(FlinkOptions.HIVE_STYLE_PARTITIONING)) {
             conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING, 
tableConfig.getBoolean(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE));
           }
+          if (tableConfig.contains(HoodieTableConfig.TYPE) && 
conf.contains(FlinkOptions.TABLE_TYPE)) {
+            if 
(!tableConfig.getString(HoodieTableConfig.TYPE).equals(conf.get(FlinkOptions.TABLE_TYPE)))
 {
+              LOG.warn(
+                  String.format("Table type conflict : %s in %s and %s in 
table options. Fix the table type as to be in line with the hoodie.properties.",
+                      tableConfig.getString(HoodieTableConfig.TYPE), 
HoodieTableConfig.HOODIE_PROPERTIES_FILE,
+                      conf.get(FlinkOptions.TABLE_TYPE)));
+              conf.setString(FlinkOptions.TABLE_TYPE, 
tableConfig.getString(HoodieTableConfig.TYPE));
+            }
+          }
           if (tableConfig.contains(HoodieTableConfig.TYPE)
               && !conf.contains(FlinkOptions.TABLE_TYPE)) {
             conf.setString(FlinkOptions.TABLE_TYPE, 
tableConfig.getString(HoodieTableConfig.TYPE));
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
index d3a48ae63b7..64145abd5bb 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
@@ -205,20 +205,31 @@ public class TestHoodieTableFactory {
     final MockContext sourceContext1 = MockContext.getInstance(this.conf, 
schema, "f2");
     assertDoesNotThrow(() -> new 
HoodieTableFactory().createDynamicTableSink(sourceContext1));
 
-    // Invalid table type will throw exception
+    // Invalid table type will throw exception if the hoodie.properties does 
not exist.
+    this.conf.setString(FlinkOptions.PATH, tempFile.getAbsolutePath() + 
"_NOT_EXIST_TABLE_PATH");
     this.conf.set(FlinkOptions.TABLE_TYPE, "INVALID_TABLE_TYPE");
     final MockContext sourceContext2 = MockContext.getInstance(this.conf, 
schema, "f2");
     assertThrows(HoodieValidationException.class, () -> new 
HoodieTableFactory().createDynamicTableSink(sourceContext2));
+    this.conf.setString(FlinkOptions.PATH, tempFile.getAbsolutePath());
 
-    // Valid table type will be ok
-    this.conf.set(FlinkOptions.TABLE_TYPE, "MERGE_ON_READ");
+    // Invalid table type will be ok if the hoodie.properties exists.
+    this.conf.set(FlinkOptions.TABLE_TYPE, "INVALID_TABLE_TYPE");
     final MockContext sourceContext3 = MockContext.getInstance(this.conf, 
schema, "f2");
     assertDoesNotThrow(() -> new 
HoodieTableFactory().createDynamicTableSink(sourceContext3));
 
     // Valid table type will be ok
-    this.conf.set(FlinkOptions.TABLE_TYPE, "COPY_ON_WRITE");
+    this.conf.set(FlinkOptions.TABLE_TYPE, "MERGE_ON_READ");
     final MockContext sourceContext4 = MockContext.getInstance(this.conf, 
schema, "f2");
     assertDoesNotThrow(() -> new 
HoodieTableFactory().createDynamicTableSink(sourceContext4));
+
+    // Setup the table type correctly for hoodie.properties
+    HoodieTableSink hoodieTableSink = (HoodieTableSink) new 
HoodieTableFactory().createDynamicTableSink(sourceContext4);
+    assertThat(hoodieTableSink.getConf().get(FlinkOptions.TABLE_TYPE), 
is("COPY_ON_WRITE"));
+
+    // Valid table type will be ok
+    this.conf.set(FlinkOptions.TABLE_TYPE, "COPY_ON_WRITE");
+    final MockContext sourceContext5 = MockContext.getInstance(this.conf, 
schema, "f2");
+    assertDoesNotThrow(() -> new 
HoodieTableFactory().createDynamicTableSink(sourceContext5));
   }
 
   @Test

Reply via email to