This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 9b01d2a  [HUDI-2142] Support setting bucket assign parallelism for 
flink write task (#3239)
9b01d2a is described below

commit 9b01d2a04520db6230cd16ef2b29013c013b1944
Author: swuferhong <[email protected]>
AuthorDate: Sat Jul 10 15:43:36 2021 +0800

    [HUDI-2142] Support setting bucket assign parallelism for flink write task 
(#3239)
---
 .../src/main/java/org/apache/hudi/configuration/FlinkOptions.java   | 6 ++++++
 .../src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java | 4 ++++
 .../src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java | 4 ++--
 hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java | 4 ++--
 4 files changed, 14 insertions(+), 4 deletions(-)

diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java 
b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 1af4459..b8a68e1 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -286,6 +286,12 @@ public class FlinkOptions {
       .defaultValue(KeyGeneratorType.SIMPLE.name())
       .withDescription("Key generator type, that implements will extract the 
key out of incoming record");
 
+  public static final ConfigOption<Integer> BUCKET_ASSIGN_TASKS = ConfigOptions
+      .key("bucket_assign.tasks")
+      .intType()
+      .defaultValue(4)
+      .withDescription("Parallelism of tasks that do bucket assign, default is 
4");
+
   public static final ConfigOption<Integer> WRITE_TASKS = ConfigOptions
       .key("write.tasks")
       .intType()
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java 
b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
index c024d7c..83454b1 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
@@ -126,6 +126,9 @@ public class FlinkStreamerConfig extends Configuration {
   @Parameter(names = {"--help", "-h"}, help = true)
   public Boolean help = false;
 
+  @Parameter(names = {"--bucket-assign-num"}, description = "Parallelism of 
tasks that do bucket assign, default is 4.")
+  public Integer bucketAssignNum = 4;
+
   @Parameter(names = {"--write-task-num"}, description = "Parallelism of tasks 
that do actual write, default is 4.")
   public Integer writeTaskNum = 4;
 
@@ -313,6 +316,7 @@ public class FlinkStreamerConfig extends Configuration {
     } else {
       conf.setString(FlinkOptions.KEYGEN_TYPE, config.keygenType);
     }
+    conf.setInteger(FlinkOptions.BUCKET_ASSIGN_TASKS, config.bucketAssignNum);
     conf.setInteger(FlinkOptions.WRITE_TASKS, config.writeTaskNum);
     conf.setString(FlinkOptions.PARTITION_DEFAULT_NAME, 
config.partitionDefaultName);
     conf.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED, 
config.indexBootstrapEnabled);
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java 
b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
index cc5f6c0..e229168 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
@@ -82,7 +82,6 @@ public class HoodieFlinkStreamer {
         (RowType) 
AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(cfg))
             .getLogicalType();
     Configuration conf = FlinkStreamerConfig.toFlinkConfig(cfg);
-    int numWriteTask = conf.getInteger(FlinkOptions.WRITE_TASKS);
     StreamWriteOperatorFactory<HoodieRecord> operatorFactory =
         new StreamWriteOperatorFactory<>(conf);
 
@@ -111,12 +110,13 @@ public class HoodieFlinkStreamer {
             "bucket_assigner",
             TypeInformation.of(HoodieRecord.class),
             new KeyedProcessOperator<>(new BucketAssignFunction<>(conf)))
+        .setParallelism(conf.getInteger(FlinkOptions.BUCKET_ASSIGN_TASKS))
         .uid("uid_bucket_assigner")
         // shuffle by fileId(bucket id)
         .keyBy(record -> record.getCurrentLocation().getFileId())
         .transform("hoodie_stream_write", TypeInformation.of(Object.class), 
operatorFactory)
         .uid("uid_hoodie_stream_write")
-        .setParallelism(numWriteTask);
+        .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
     if (StreamerUtil.needsAsyncCompaction(conf)) {
       pipeline.transform("compact_plan_generate",
               TypeInformation.of(CompactionPlanEvent.class),
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java 
b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
index 161fb21..2c39c92 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
@@ -69,7 +69,6 @@ public class HoodieTableSink implements DynamicTableSink, 
SupportsPartitioning,
     return (DataStreamSinkProvider) dataStream -> {
       // Read from kafka source
       RowType rowType = (RowType) 
schema.toRowDataType().notNull().getLogicalType();
-      int numWriteTasks = conf.getInteger(FlinkOptions.WRITE_TASKS);
       long ckpTimeout = dataStream.getExecutionEnvironment()
           .getCheckpointConfig().getCheckpointTimeout();
       conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
@@ -94,11 +93,12 @@ public class HoodieTableSink implements DynamicTableSink, 
SupportsPartitioning,
               "bucket_assigner",
               TypeInformation.of(HoodieRecord.class),
               new BucketAssignOperator<>(new BucketAssignFunction<>(conf)))
+          .setParallelism(conf.getInteger(FlinkOptions.BUCKET_ASSIGN_TASKS))
           .uid("uid_bucket_assigner_" + 
conf.getString(FlinkOptions.TABLE_NAME))
           // shuffle by fileId(bucket id)
           .keyBy(record -> record.getCurrentLocation().getFileId())
           .transform("hoodie_stream_write", TypeInformation.of(Object.class), 
operatorFactory)
-          .setParallelism(numWriteTasks);
+          .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
       if (StreamerUtil.needsAsyncCompaction(conf)) {
         return pipeline.transform("compact_plan_generate",
             TypeInformation.of(CompactionPlanEvent.class),

Reply via email to