This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 92f68529819 [HUDI-7159]Check the table type between hoodie.properies
and table options (#10209)
92f68529819 is described below
commit 92f6852981952a552cb7a5bf4623f8c3eb1cca73
Author: hehuiyuan <[email protected]>
AuthorDate: Sat Dec 9 11:11:20 2023 +0800
[HUDI-7159]Check the table type between hoodie.properies and table options
(#10209)
---
.../org/apache/hudi/table/HoodieTableFactory.java | 9 +++++++++
.../org/apache/hudi/table/TestHoodieTableFactory.java | 19 +++++++++++++++----
2 files changed, 24 insertions(+), 4 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 747617604cb..6751083a5cd 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -125,6 +125,15 @@ public class HoodieTableFactory implements
DynamicTableSourceFactory, DynamicTab
&& !conf.contains(FlinkOptions.HIVE_STYLE_PARTITIONING)) {
conf.setBoolean(FlinkOptions.HIVE_STYLE_PARTITIONING,
tableConfig.getBoolean(HoodieTableConfig.HIVE_STYLE_PARTITIONING_ENABLE));
}
+ if (tableConfig.contains(HoodieTableConfig.TYPE) &&
conf.contains(FlinkOptions.TABLE_TYPE)) {
+ if
(!tableConfig.getString(HoodieTableConfig.TYPE).equals(conf.get(FlinkOptions.TABLE_TYPE)))
{
+ LOG.warn(
+ String.format("Table type conflict : %s in %s and %s in
table options. Fix the table type as to be in line with the hoodie.properties.",
+ tableConfig.getString(HoodieTableConfig.TYPE),
HoodieTableConfig.HOODIE_PROPERTIES_FILE,
+ conf.get(FlinkOptions.TABLE_TYPE)));
+ conf.setString(FlinkOptions.TABLE_TYPE,
tableConfig.getString(HoodieTableConfig.TYPE));
+ }
+ }
if (tableConfig.contains(HoodieTableConfig.TYPE)
&& !conf.contains(FlinkOptions.TABLE_TYPE)) {
conf.setString(FlinkOptions.TABLE_TYPE,
tableConfig.getString(HoodieTableConfig.TYPE));
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
index d3a48ae63b7..64145abd5bb 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
@@ -205,20 +205,31 @@ public class TestHoodieTableFactory {
final MockContext sourceContext1 = MockContext.getInstance(this.conf,
schema, "f2");
assertDoesNotThrow(() -> new
HoodieTableFactory().createDynamicTableSink(sourceContext1));
- // Invalid table type will throw exception
+ // Invalid table type will throw exception if the hoodie.properties does
not exist.
+ this.conf.setString(FlinkOptions.PATH, tempFile.getAbsolutePath() +
"_NOT_EXIST_TABLE_PATH");
this.conf.set(FlinkOptions.TABLE_TYPE, "INVALID_TABLE_TYPE");
final MockContext sourceContext2 = MockContext.getInstance(this.conf,
schema, "f2");
assertThrows(HoodieValidationException.class, () -> new
HoodieTableFactory().createDynamicTableSink(sourceContext2));
+ this.conf.setString(FlinkOptions.PATH, tempFile.getAbsolutePath());
- // Valid table type will be ok
- this.conf.set(FlinkOptions.TABLE_TYPE, "MERGE_ON_READ");
+ // Invalid table type will be ok if the hoodie.properties exists.
+ this.conf.set(FlinkOptions.TABLE_TYPE, "INVALID_TABLE_TYPE");
final MockContext sourceContext3 = MockContext.getInstance(this.conf,
schema, "f2");
assertDoesNotThrow(() -> new
HoodieTableFactory().createDynamicTableSink(sourceContext3));
// Valid table type will be ok
- this.conf.set(FlinkOptions.TABLE_TYPE, "COPY_ON_WRITE");
+ this.conf.set(FlinkOptions.TABLE_TYPE, "MERGE_ON_READ");
final MockContext sourceContext4 = MockContext.getInstance(this.conf,
schema, "f2");
assertDoesNotThrow(() -> new
HoodieTableFactory().createDynamicTableSink(sourceContext4));
+
+ // Setup the table type correctly for hoodie.properties
+ HoodieTableSink hoodieTableSink = (HoodieTableSink) new
HoodieTableFactory().createDynamicTableSink(sourceContext4);
+ assertThat(hoodieTableSink.getConf().get(FlinkOptions.TABLE_TYPE),
is("COPY_ON_WRITE"));
+
+ // Valid table type will be ok
+ this.conf.set(FlinkOptions.TABLE_TYPE, "COPY_ON_WRITE");
+ final MockContext sourceContext5 = MockContext.getInstance(this.conf,
schema, "f2");
+ assertDoesNotThrow(() -> new
HoodieTableFactory().createDynamicTableSink(sourceContext5));
}
@Test