This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d994c58cc0 [HUDI-3946] Validate option path in flink hudi sink (#5397)
d994c58cc0 is described below
commit d994c58cc020e645e3cc4f71511e5bb5ff8dc3c3
Author: Ruguo Yu <[email protected]>
AuthorDate: Mon Apr 25 10:13:47 2022 +0800
[HUDI-3946] Validate option path in flink hudi sink (#5397)
---
.../src/main/java/org/apache/hudi/table/HoodieTableFactory.java | 5 +++++
1 file changed, 5 insertions(+)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 7543382e19..5464ea3f20 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.EventTimeAvroPayload;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.configuration.OptionsResolver;
import org.apache.hudi.exception.HoodieValidationException;
@@ -53,6 +54,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Set;
+import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
+
/**
* Hoodie data source/sink factory.
*/
@@ -81,6 +84,8 @@ public class HoodieTableFactory implements
DynamicTableSourceFactory, DynamicTab
@Override
public DynamicTableSink createDynamicTableSink(Context context) {
Configuration conf =
FlinkOptions.fromMap(context.getCatalogTable().getOptions());
+
checkArgument(!StringUtils.isNullOrEmpty(conf.getString(FlinkOptions.PATH)),
+ "Option [path] should not be empty.");
ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
sanityCheck(conf, schema);
setupConfOptions(conf, context.getObjectIdentifier().getObjectName(),
context.getCatalogTable(), schema);