This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 834cf4a63ce [HUDI-7317] FlinkTableFactory snatifyCheck should contains
index type (#10541)
834cf4a63ce is described below
commit 834cf4a63ceb1193025010d3f8f0cc065e23c46d
Author: xuzifu666 <[email protected]>
AuthorDate: Mon Jan 22 13:29:29 2024 +0800
[HUDI-7317] FlinkTableFactory snatifyCheck should contains index type
(#10541)
Co-authored-by: xuyu <[email protected]>
---
.../org/apache/hudi/table/HoodieTableFactory.java | 12 +++++++++++
.../apache/hudi/table/TestHoodieTableFactory.java | 25 ++++++++++++++++++++++
2 files changed, 37 insertions(+)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 6751083a5cd..68642b39da8 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -22,6 +22,7 @@ import org.apache.hudi.avro.AvroSchemaUtils;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.HadoopConfigurations;
import org.apache.hudi.configuration.OptionsResolver;
@@ -176,6 +177,7 @@ public class HoodieTableFactory implements
DynamicTableSourceFactory, DynamicTab
*/
private void sanityCheck(Configuration conf, ResolvedSchema schema) {
checkTableType(conf);
+ checkIndexType(conf);
if (!OptionsResolver.isAppendMode(conf)) {
checkRecordKey(conf, schema);
@@ -183,6 +185,16 @@ public class HoodieTableFactory implements
DynamicTableSourceFactory, DynamicTab
StreamerUtil.checkPreCombineKey(conf, schema.getColumnNames());
}
+ /**
+ * Validate the index type.
+ */
+ private void checkIndexType(Configuration conf) {
+ String indexType = conf.get(FlinkOptions.INDEX_TYPE);
+ if (!StringUtils.isNullOrEmpty(indexType)) {
+ HoodieIndexConfig.INDEX_TYPE.checkValues(indexType);
+ }
+ }
+
/**
* Validate the table type.
*/
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
index 64145abd5bb..6469fb5c634 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
@@ -191,6 +191,31 @@ public class TestHoodieTableFactory {
assertDoesNotThrow(() -> new
HoodieTableFactory().createDynamicTableSink(sourceContext6));
}
+ @Test
+ void testIndexTypeCheck() {
+ ResolvedSchema schema = SchemaBuilder.instance()
+ .field("f0", DataTypes.INT().notNull())
+ .field("f1", DataTypes.VARCHAR(20))
+ .field("f2", DataTypes.TIMESTAMP(3))
+ .field("ts", DataTypes.TIMESTAMP(3))
+ .primaryKey("f0")
+ .build();
+
+ // Index type unset. The default value will be ok
+ final MockContext sourceContext1 = MockContext.getInstance(this.conf,
schema, "f2");
+ assertDoesNotThrow(() -> new
HoodieTableFactory().createDynamicTableSink(sourceContext1));
+
+ // Invalid index type will throw exception
+ this.conf.set(FlinkOptions.INDEX_TYPE, "BUCKET_AA");
+ final MockContext sourceContext2 = MockContext.getInstance(this.conf,
schema, "f2");
+ assertThrows(IllegalArgumentException.class, () -> new
HoodieTableFactory().createDynamicTableSink(sourceContext2));
+
+ // Valid index type will be ok
+ this.conf.set(FlinkOptions.INDEX_TYPE, "BUCKET");
+ final MockContext sourceContext3 = MockContext.getInstance(this.conf,
schema, "f2");
+ assertDoesNotThrow(() -> new
HoodieTableFactory().createDynamicTableSink(sourceContext3));
+ }
+
@Test
void testTableTypeCheck() {
ResolvedSchema schema = SchemaBuilder.instance()