This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new aff1a1e  [HUDI-2171] Add parallelism conf for bootstrap operator
     new 632bfd1  Merge pull request #3268 from yuzhaojing/HUDI-2171
aff1a1e is described below

commit aff1a1ed29a5a64932c64195be744969fcc0b08b
Author: 喻兆靖 <[email protected]>
AuthorDate: Tue Jul 13 17:55:12 2021 +0800

    [HUDI-2171] Add parallelism conf for bootstrap operator
---
 .../apache/hudi/configuration/FlinkOptions.java    |  8 ++++-
 .../apache/hudi/streamer/FlinkStreamerConfig.java  |  4 +++
 .../apache/hudi/streamer/HoodieFlinkStreamer.java  | 38 ++++++++++++----------
 .../org/apache/hudi/table/HoodieTableSink.java     |  4 ++-
 4 files changed, 35 insertions(+), 19 deletions(-)

diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java 
b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index b8a68e1..7254f3c 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -286,8 +286,14 @@ public class FlinkOptions {
       .defaultValue(KeyGeneratorType.SIMPLE.name())
       .withDescription("Key generator type, that implements will extract the 
key out of incoming record");
 
+  public static final ConfigOption<Integer> INDEX_BOOTSTRAP_TASKS = 
ConfigOptions
+      .key("write.index_bootstrap.tasks")
+      .intType()
+      .defaultValue(4)
+      .withDescription("Parallelism of tasks that do index bootstrap, default 
is 4");
+
   public static final ConfigOption<Integer> BUCKET_ASSIGN_TASKS = ConfigOptions
-      .key("bucket_assign.tasks")
+      .key("write.bucket_assign.tasks")
       .intType()
       .defaultValue(4)
       .withDescription("Parallelism of tasks that do bucket assign, default is 
4");
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java 
b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
index 83454b1..d81fd3d 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
@@ -126,6 +126,9 @@ public class FlinkStreamerConfig extends Configuration {
   @Parameter(names = {"--help", "-h"}, help = true)
   public Boolean help = false;
 
+  @Parameter(names = {"--index-bootstrap-num"}, description = "Parallelism of 
tasks that do bucket assign, default is 4.")
+  public Integer indexBootstrapNum = 4;
+
   @Parameter(names = {"--bucket-assign-num"}, description = "Parallelism of 
tasks that do bucket assign, default is 4.")
   public Integer bucketAssignNum = 4;
 
@@ -316,6 +319,7 @@ public class FlinkStreamerConfig extends Configuration {
     } else {
       conf.setString(FlinkOptions.KEYGEN_TYPE, config.keygenType);
     }
+    conf.setInteger(FlinkOptions.INDEX_BOOTSTRAP_TASKS, 
config.indexBootstrapNum);
     conf.setInteger(FlinkOptions.BUCKET_ASSIGN_TASKS, config.bucketAssignNum);
     conf.setInteger(FlinkOptions.WRITE_TASKS, config.writeTaskNum);
     conf.setString(FlinkOptions.PARTITION_DEFAULT_NAME, 
config.partitionDefaultName);
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java 
b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
index e229168..f8cf840 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
@@ -52,7 +52,7 @@ import java.util.Properties;
 /**
  * An Utility which can incrementally consume data from Kafka and apply it to 
the target table.
  * currently, it only support COW table and insert, upsert operation.
- *
+ * <p>
  * note: HoodieFlinkStreamer is not suitable to initialize on large tables 
when we have no checkpoint to restore from.
  */
 public class HoodieFlinkStreamer {
@@ -98,9 +98,13 @@ public class HoodieFlinkStreamer {
         .uid("uid_kafka_source")
         .map(new RowDataToHoodieFunction<>(rowType, conf), 
TypeInformation.of(HoodieRecord.class));
     if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
-      hoodieDataStream = 
hoodieDataStream.rebalance().transform("index_bootstrap",
+      hoodieDataStream = hoodieDataStream.rebalance()
+          .transform(
+              "index_bootstrap",
               TypeInformation.of(HoodieRecord.class),
-              new ProcessOperator<>(new BootstrapFunction<>(conf)));
+              new ProcessOperator<>(new BootstrapFunction<>(conf)))
+          .setParallelism(conf.getInteger(FlinkOptions.INDEX_BOOTSTRAP_TASKS))
+          .uid("uid_index_bootstrap_" + 
conf.getString(FlinkOptions.TABLE_NAME));
     }
 
     DataStream<Object> pipeline = hoodieDataStream
@@ -119,22 +123,22 @@ public class HoodieFlinkStreamer {
         .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
     if (StreamerUtil.needsAsyncCompaction(conf)) {
       pipeline.transform("compact_plan_generate",
-              TypeInformation.of(CompactionPlanEvent.class),
-              new CompactionPlanOperator(conf))
-              .uid("uid_compact_plan_generate")
-              .setParallelism(1) // plan generate must be singleton
-              .rebalance()
-              .transform("compact_task",
-                      TypeInformation.of(CompactionCommitEvent.class),
-                      new ProcessOperator<>(new CompactFunction(conf)))
-              .setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS))
-              .addSink(new CompactionCommitSink(conf))
-              .name("compact_commit")
-              .setParallelism(1); // compaction commit should be singleton
+          TypeInformation.of(CompactionPlanEvent.class),
+          new CompactionPlanOperator(conf))
+          .uid("uid_compact_plan_generate")
+          .setParallelism(1) // plan generate must be singleton
+          .rebalance()
+          .transform("compact_task",
+              TypeInformation.of(CompactionCommitEvent.class),
+              new ProcessOperator<>(new CompactFunction(conf)))
+          .setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS))
+          .addSink(new CompactionCommitSink(conf))
+          .name("compact_commit")
+          .setParallelism(1); // compaction commit should be singleton
     } else {
       pipeline.addSink(new CleanFunction<>(conf))
-              .setParallelism(1)
-              .name("clean_commits").uid("uid_clean_commits");
+          .setParallelism(1)
+          .name("clean_commits").uid("uid_clean_commits");
     }
 
     env.execute(cfg.targetTableName);
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java 
b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
index 2c39c92..ad42d0c 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
@@ -80,9 +80,11 @@ public class HoodieTableSink implements DynamicTableSink, 
SupportsPartitioning,
       // TODO: This is a very time-consuming operation, will optimization
       if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
         hoodieDataStream = hoodieDataStream.rebalance()
-            .transform("index_bootstrap",
+            .transform(
+                "index_bootstrap",
                 TypeInformation.of(HoodieRecord.class),
                 new ProcessOperator<>(new BootstrapFunction<>(conf)))
+            
.setParallelism(conf.getInteger(FlinkOptions.INDEX_BOOTSTRAP_TASKS))
             .uid("uid_index_bootstrap_" + 
conf.getString(FlinkOptions.TABLE_NAME));
       }
 

Reply via email to