This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch release-0.12.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 6234064d141040cd0f84b2ecedc61607ae2378eb
Author: F7753 <[email protected]>
AuthorDate: Wed Aug 3 12:32:32 2022 +0800

    [HUDI-4477] Adjust partition number of flink sink task (#6218)
    
    Co-authored-by: lewinma <[email protected]>
---
 .../src/main/java/org/apache/hudi/sink/utils/Pipelines.java          | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index 31355255f9..f89bdb2606 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -56,7 +56,6 @@ import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
 import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
 import org.apache.flink.streaming.api.operators.ProcessOperator;
 import org.apache.flink.table.data.RowData;
@@ -136,8 +135,8 @@ public class Pipelines {
         // shuffle by partition keys
         // use #partitionCustom instead of #keyBy to avoid duplicate sort 
operations,
         // see BatchExecutionUtils#applyBatchExecutionSettings for details.
-        Partitioner<String> partitioner = (key, channels) ->
-            KeyGroupRangeAssignment.assignKeyToParallelOperator(key, 
StreamGraphGenerator.DEFAULT_LOWER_BOUND_MAX_PARALLELISM, channels);
+        Partitioner<String> partitioner = (key, channels) -> 
KeyGroupRangeAssignment.assignKeyToParallelOperator(key,
+                
KeyGroupRangeAssignment.computeDefaultMaxParallelism(conf.getInteger(FlinkOptions.WRITE_TASKS)),
 channels);
         dataStream = dataStream.partitionCustom(partitioner, 
rowDataKeyGen::getPartitionPath);
       }
       if (conf.getBoolean(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) {

Reply via email to