This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d994c58cc0 [HUDI-3946] Validate option path in flink hudi sink (#5397)
d994c58cc0 is described below

commit d994c58cc020e645e3cc4f71511e5bb5ff8dc3c3
Author: Ruguo Yu <[email protected]>
AuthorDate: Mon Apr 25 10:13:47 2022 +0800

    [HUDI-3946] Validate option path in flink hudi sink (#5397)
---
 .../src/main/java/org/apache/hudi/table/HoodieTableFactory.java      | 5 +++++
 1 file changed, 5 insertions(+)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index 7543382e19..5464ea3f20 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -20,6 +20,7 @@ package org.apache.hudi.table;
 
 import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
 import org.apache.hudi.common.model.EventTimeAvroPayload;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.OptionsResolver;
 import org.apache.hudi.exception.HoodieValidationException;
@@ -53,6 +54,8 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
+import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
+
 /**
  * Hoodie data source/sink factory.
  */
@@ -81,6 +84,8 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
   @Override
   public DynamicTableSink createDynamicTableSink(Context context) {
     Configuration conf = 
FlinkOptions.fromMap(context.getCatalogTable().getOptions());
+    
checkArgument(!StringUtils.isNullOrEmpty(conf.getString(FlinkOptions.PATH)),
+        "Option [path] should not be empty.");
     ResolvedSchema schema = context.getCatalogTable().getResolvedSchema();
     sanityCheck(conf, schema);
     setupConfOptions(conf, context.getObjectIdentifier().getObjectName(), 
context.getCatalogTable(), schema);

Reply via email to