This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new bda3db078e support generan parameter 'sink.parallelism' for flink-hudi 
(#5405)
bda3db078e is described below

commit bda3db078e927421c10932cfcb3019cfddb125b6
Author: hehuiyuan <[email protected]>
AuthorDate: Sun Apr 24 19:09:39 2022 +0800

    support generan parameter 'sink.parallelism' for flink-hudi (#5405)
    
    Co-authored-by: hehuiyuan1 <[email protected]>
---
 .../src/main/java/org/apache/hudi/configuration/FlinkOptions.java      | 3 +++
 .../src/main/java/org/apache/hudi/table/HoodieTableSink.java           | 3 ++-
 2 files changed, 5 insertions(+), 1 deletion(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index e2be7d364b..c6b16e6ecd 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -34,6 +34,7 @@ import org.apache.hudi.keygen.constant.KeyGeneratorType;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.factories.FactoryUtil;
 
 import java.lang.reflect.Field;
 import java.util.ArrayList;
@@ -232,6 +233,8 @@ public class FlinkOptions extends HoodieConfig {
   // ------------------------------------------------------------------------
   //  Write Options
   // ------------------------------------------------------------------------
+  public static final ConfigOption<Integer> SINK_PARALLELISM = 
FactoryUtil.SINK_PARALLELISM;
+
   public static final ConfigOption<String> TABLE_NAME = ConfigOptions
       .key(HoodieWriteConfig.TBL_NAME.key())
       .stringType()
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
index 4dd4f89d03..46e360d3a2 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
@@ -82,7 +82,8 @@ public class HoodieTableSink implements DynamicTableSink, 
SupportsPartitioning,
       }
 
       // default parallelism
-      int parallelism = dataStream.getExecutionConfig().getParallelism();
+      int parallelism = conf.getInteger(FlinkOptions.SINK_PARALLELISM,
+              dataStream.getExecutionConfig().getParallelism());
       DataStream<Object> pipeline;
       // bootstrap
       final DataStream<HoodieRecord> hoodieRecordDataStream =

Reply via email to