This is an automated email from the ASF dual-hosted git repository.

wangxianghu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 67c3124  [HUDI-2032] Make keygen class and keygen type optional for 
FlinkStreamerConfig (#3104)
67c3124 is described below

commit 67c31243522268002c8b1bf5285dfe3c07cf5783
Author: vinoyang <[email protected]>
AuthorDate: Thu Jun 17 21:22:13 2021 +0800

    [HUDI-2032] Make keygen class and keygen type optional for 
FlinkStreamerConfig (#3104)
    
    * [HUDI-2032] Make keygen class and keygen type optional for 
FlinkStreamerConfig
    
    * Address the review suggestion
---
 .../src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java | 6 ++++++
 1 file changed, 6 insertions(+)

diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java 
b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
index 843e9bf..59534cf 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
@@ -21,6 +21,7 @@ package org.apache.hudi.streamer;
 import org.apache.hudi.client.utils.OperationConverter;
 import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.keygen.constant.KeyGeneratorType;
 import org.apache.hudi.util.StreamerUtil;
@@ -155,6 +156,11 @@ public class FlinkStreamerConfig extends Configuration {
     conf.setString(FlinkOptions.RECORD_KEY_FIELD, config.recordKeyField);
     conf.setString(FlinkOptions.PARTITION_PATH_FIELD, 
config.partitionPathField);
     conf.setString(FlinkOptions.KEYGEN_CLASS, config.keygenClass);
+    if (!StringUtils.isNullOrEmpty(config.keygenClass)) {
+      conf.setString(FlinkOptions.KEYGEN_CLASS, config.keygenClass);
+    } else {
+      conf.setString(FlinkOptions.KEYGEN_TYPE, config.keygenType);
+    }
     conf.setInteger(FlinkOptions.WRITE_TASKS, config.writeTaskNum);
 
     return conf;

Reply via email to