This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 9b01d2a [HUDI-2142] Support setting bucket assign parallelism for
flink write task (#3239)
9b01d2a is described below
commit 9b01d2a04520db6230cd16ef2b29013c013b1944
Author: swuferhong <[email protected]>
AuthorDate: Sat Jul 10 15:43:36 2021 +0800
[HUDI-2142] Support setting bucket assign parallelism for flink write task
(#3239)
---
.../src/main/java/org/apache/hudi/configuration/FlinkOptions.java | 6 ++++++
.../src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java | 4 ++++
.../src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java | 4 ++--
hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java | 4 ++--
4 files changed, 14 insertions(+), 4 deletions(-)
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
index 1af4459..b8a68e1 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java
@@ -286,6 +286,12 @@ public class FlinkOptions {
.defaultValue(KeyGeneratorType.SIMPLE.name())
.withDescription("Key generator type, that implements will extract the
key out of incoming record");
+ public static final ConfigOption<Integer> BUCKET_ASSIGN_TASKS = ConfigOptions
+ .key("bucket_assign.tasks")
+ .intType()
+ .defaultValue(4)
+ .withDescription("Parallelism of tasks that do bucket assign, default is
4");
+
public static final ConfigOption<Integer> WRITE_TASKS = ConfigOptions
.key("write.tasks")
.intType()
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
index c024d7c..83454b1 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/FlinkStreamerConfig.java
@@ -126,6 +126,9 @@ public class FlinkStreamerConfig extends Configuration {
@Parameter(names = {"--help", "-h"}, help = true)
public Boolean help = false;
+ @Parameter(names = {"--bucket-assign-num"}, description = "Parallelism of
tasks that do bucket assign, default is 4.")
+ public Integer bucketAssignNum = 4;
+
@Parameter(names = {"--write-task-num"}, description = "Parallelism of tasks
that do actual write, default is 4.")
public Integer writeTaskNum = 4;
@@ -313,6 +316,7 @@ public class FlinkStreamerConfig extends Configuration {
} else {
conf.setString(FlinkOptions.KEYGEN_TYPE, config.keygenType);
}
+ conf.setInteger(FlinkOptions.BUCKET_ASSIGN_TASKS, config.bucketAssignNum);
conf.setInteger(FlinkOptions.WRITE_TASKS, config.writeTaskNum);
conf.setString(FlinkOptions.PARTITION_DEFAULT_NAME,
config.partitionDefaultName);
conf.setBoolean(FlinkOptions.INDEX_BOOTSTRAP_ENABLED,
config.indexBootstrapEnabled);
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
index cc5f6c0..e229168 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamer.java
@@ -82,7 +82,6 @@ public class HoodieFlinkStreamer {
(RowType)
AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(cfg))
.getLogicalType();
Configuration conf = FlinkStreamerConfig.toFlinkConfig(cfg);
- int numWriteTask = conf.getInteger(FlinkOptions.WRITE_TASKS);
StreamWriteOperatorFactory<HoodieRecord> operatorFactory =
new StreamWriteOperatorFactory<>(conf);
@@ -111,12 +110,13 @@ public class HoodieFlinkStreamer {
"bucket_assigner",
TypeInformation.of(HoodieRecord.class),
new KeyedProcessOperator<>(new BucketAssignFunction<>(conf)))
+ .setParallelism(conf.getInteger(FlinkOptions.BUCKET_ASSIGN_TASKS))
.uid("uid_bucket_assigner")
// shuffle by fileId(bucket id)
.keyBy(record -> record.getCurrentLocation().getFileId())
.transform("hoodie_stream_write", TypeInformation.of(Object.class),
operatorFactory)
.uid("uid_hoodie_stream_write")
- .setParallelism(numWriteTask);
+ .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
if (StreamerUtil.needsAsyncCompaction(conf)) {
pipeline.transform("compact_plan_generate",
TypeInformation.of(CompactionPlanEvent.class),
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
index 161fb21..2c39c92 100644
--- a/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
+++ b/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableSink.java
@@ -69,7 +69,6 @@ public class HoodieTableSink implements DynamicTableSink,
SupportsPartitioning,
return (DataStreamSinkProvider) dataStream -> {
// Read from kafka source
RowType rowType = (RowType)
schema.toRowDataType().notNull().getLogicalType();
- int numWriteTasks = conf.getInteger(FlinkOptions.WRITE_TASKS);
long ckpTimeout = dataStream.getExecutionEnvironment()
.getCheckpointConfig().getCheckpointTimeout();
conf.setLong(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT, ckpTimeout);
@@ -94,11 +93,12 @@ public class HoodieTableSink implements DynamicTableSink,
SupportsPartitioning,
"bucket_assigner",
TypeInformation.of(HoodieRecord.class),
new BucketAssignOperator<>(new BucketAssignFunction<>(conf)))
+ .setParallelism(conf.getInteger(FlinkOptions.BUCKET_ASSIGN_TASKS))
.uid("uid_bucket_assigner_" +
conf.getString(FlinkOptions.TABLE_NAME))
// shuffle by fileId(bucket id)
.keyBy(record -> record.getCurrentLocation().getFileId())
.transform("hoodie_stream_write", TypeInformation.of(Object.class),
operatorFactory)
- .setParallelism(numWriteTasks);
+ .setParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS));
if (StreamerUtil.needsAsyncCompaction(conf)) {
return pipeline.transform("compact_plan_generate",
TypeInformation.of(CompactionPlanEvent.class),