This is an automated email from the ASF dual-hosted git repository. codope pushed a commit to branch release-0.12.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 6234064d141040cd0f84b2ecedc61607ae2378eb Author: F7753 <[email protected]> AuthorDate: Wed Aug 3 12:32:32 2022 +0800 [HUDI-4477] Adjust partition number of flink sink task (#6218) Co-authored-by: lewinma <[email protected]> --- .../src/main/java/org/apache/hudi/sink/utils/Pipelines.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java index 31355255f9..f89bdb2606 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java @@ -56,7 +56,6 @@ import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.functions.sink.SinkFunction; -import org.apache.flink.streaming.api.graph.StreamGraphGenerator; import org.apache.flink.streaming.api.operators.KeyedProcessOperator; import org.apache.flink.streaming.api.operators.ProcessOperator; import org.apache.flink.table.data.RowData; @@ -136,8 +135,8 @@ public class Pipelines { // shuffle by partition keys // use #partitionCustom instead of #keyBy to avoid duplicate sort operations, // see BatchExecutionUtils#applyBatchExecutionSettings for details. - Partitioner<String> partitioner = (key, channels) -> - KeyGroupRangeAssignment.assignKeyToParallelOperator(key, StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM, channels); + Partitioner<String> partitioner = (key, channels) -> KeyGroupRangeAssignment.assignKeyToParallelOperator(key, + KeyGroupRangeAssignment.computeDefaultMaxParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)), channels); dataStream = dataStream.partitionCustom(partitioner, rowDataKeyGen::getPartitionPath); } if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) {
