This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new aff1a1e [HUDI-2171] Add parallelism conf for bootstrap operator
new 632bfd1 Merge pull request #3268 from yuzhaojing/HUDI-2171
aff1a1e is described below
commit aff1a1ed29a5a64932c64195be744969fcc0b08b
Author: 喻兆靖 <[email protected]>
AuthorDate: Tue Jul 13 17:55:12 2021 +0800
[HUDI-2171] Add parallelism conf for bootstrap operator
---
.../apache/hudi/configuration/FlinkOptions.java | 8 ++++-
.../apache/hudi/streamer/FlinkStreamerConfig.java | 4 +++
.../apache/hudi/streamer/HoodieFlinkStreamer.java | 38 ++++++++++++----------
.../org/apache/hudi/table/HoodieTableSink.java | 4 ++-
4 files changed, 35 insertions(+), 19 deletions(-)
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index b8a68e1..7254f3c 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -286,8 +286,14 @@ public class FlinkOptions {
.defaultValue(KeyGeneratorType.SIMPLE.name())
.withDescription("Key generator type, that implements will extract the
key out of incoming record");
+ public static final ConfigOption<Integer> INDEX_BOOTSTRAP_TASKS =
ConfigOptions
+ .key("write.index_bootstrap.tasks")
+ .intType()
+ .defaultValue(4)
+ .withDescription("Parallelism of tasks that do index bootstrap, default
is 4");
+
public static final ConfigOption<Integer> BUCKET_ASSIGN_TASKS = ConfigOptions
- .key("bucket_assign.tasks")
+ .key("write.bucket_assign.tasks")
.intType()
.defaultValue(4)
.withDescription("Parallelism of tasks that do bucket assign, default is
4");
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
index 83454b1..d81fd3d 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
@@ -126,6 +126,9 @@ public class FlinkStreamerConfig extends Configuration {
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;
+ @Parameter(names = {"--index-bootstrap-num"}, description = "Parallelism of
tasks that do bucket assign, default is 4.")
+ public Integer indexBootstrapNum = 4;
+
@Parameter(names = {"--bucket-assign-num"}, description = "Parallelism of
tasks that do bucket assign, default is 4.")
public Integer bucketAssignNum = 4;
@@ -316,6 +319,7 @@ public class FlinkStreamerConfig extends Configuration {
} else {
conf.setString(FlinkOptions.KEYGEN_TYPE, config.keygenType);
}
+ conf.setInteger(FlinkOptions.INDEX_BOOTSTRAP_TASKS,
config.indexBootstrapNum);
conf.setInteger(FlinkOptions.BUCKET_ASSIGN_TASKS, config.bucketAssignNum);
conf.setInteger(FlinkOptions.WRITE_TASKS, config.writeTaskNum);
conf.setString(FlinkOptions.PARTITION_DEFAULT_NAME,
config.partitionDefaultName);
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
index e229168..f8cf840 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
@@ -52,7 +52,7 @@ import java.util.Properties;
/**
* An Utility which can incrementally consume data from Kafka and apply it to
the target table.
* currently, it only support COW table and insert, upsert operation.
- *
+ * <p>
* note: HoodieFlinkStreamer is not suitable to initialize on large tables
when we have no checkpoint to restore from.
*/
public class HoodieFlinkStreamer {
@@ -98,9 +98,13 @@ public class HoodieFlinkStreamer {
.uid("uid_kafka_source")
.map(new RowDataToHoodieFunction<>(rowType, conf),
TypeInformation.of(HoodieRecord.class));
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
- hoodieDataStream =
hoodieDataStream.rebalance().transform("index_bootstrap",
+ hoodieDataStream = hoodieDataStream.rebalance()
+ .transform(
+ "index_bootstrap",
TypeInformation.of(HoodieRecord.class),
- new ProcessOperator<>(new BootstrapFunction<>(conf)));
+ new ProcessOperator<>(new BootstrapFunction<>(conf)))
+ .setParallelism(conf.getInteger(FlinkOptions.INDEX_BOOTSTRAP_TASKS))
+ .uid("uid_index_bootstrap_" +
conf.getString(FlinkOptions.TABLE_NAME));
}
DataStream<Object> pipeline = hoodieDataStream
@@ -119,22 +123,22 @@ public class HoodieFlinkStreamer {
.setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
if (StreamerUtil.needsAsyncCompaction(conf)) {
pipeline.transform("compact_plan_generate",
- TypeInformation.of(CompactionPlanEvent.class),
- new CompactionPlanOperator(conf))
- .uid("uid_compact_plan_generate")
- .setParallelism(1) // plan generate must be singleton
- .rebalance()
- .transform("compact_task",
- TypeInformation.of(CompactionCommitEvent.class),
- new ProcessOperator<>(new CompactFunction(conf)))
- .setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS))
- .addSink(new CompactionCommitSink(conf))
- .name("compact_commit")
- .setParallelism(1); // compaction commit should be singleton
+ TypeInformation.of(CompactionPlanEvent.class),
+ new CompactionPlanOperator(conf))
+ .uid("uid_compact_plan_generate")
+ .setParallelism(1) // plan generate must be singleton
+ .rebalance()
+ .transform("compact_task",
+ TypeInformation.of(CompactionCommitEvent.class),
+ new ProcessOperator<>(new CompactFunction(conf)))
+ .setParallelism(conf.getInteger(FlinkOptions.COMPACTION_TASKS))
+ .addSink(new CompactionCommitSink(conf))
+ .name("compact_commit")
+ .setParallelism(1); // compaction commit should be singleton
} else {
pipeline.addSink(new CleanFunction<>(conf))
- .setParallelism(1)
- .name("clean_commits").uid("uid_clean_commits");
+ .setParallelism(1)
+ .name("clean_commits").uid("uid_clean_commits");
}
env.execute(cfg.targetTableName);
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
index 2c39c92..ad42d0c 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
@@ -80,9 +80,11 @@ public class HoodieTableSink implements DynamicTableSink,
SupportsPartitioning,
// TODO: This is a very time-consuming operation, will optimization
if (conf.getBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED)) {
hoodieDataStream = hoodieDataStream.rebalance()
- .transform("index_bootstrap",
+ .transform(
+ "index_bootstrap",
TypeInformation.of(HoodieRecord.class),
new ProcessOperator<>(new BootstrapFunction<>(conf)))
+
.setParallelism(conf.getInteger(FlinkOptions.INDEX_BOOTSTRAP_TASKS))
.uid("uid_index_bootstrap_" +
conf.getString(FlinkOptions.TABLE_NAME));
}