1996fanrui commented on code in PR #21443:
URL: https://github.com/apache/flink/pull/21443#discussion_r1042919428
##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java:
##########
@@ -1614,6 +1617,15 @@
List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> createRecordWriters
return recordWriters;
}
+ private static void
replaceForwardPartitionerIfConsumerParallelismDoesNotMatch(
+ Environment environment, NonChainedOutput streamOutput) {
+ if (streamOutput.getPartitioner() instanceof ForwardPartitioner
+ && streamOutput.getConsumerParallelism()
+ !=
environment.getTaskInfo().getNumberOfParallelSubtasks()) {
+ streamOutput.setPartitioner(new RescalePartitioner<>());
Review Comment:
Thanks for your feedback. In most scenarios, `RescalePartitioner` works
well. However, I'm not sure if it's generic.
For example, a job has 2 vertexes, the parallelism of them are 200, the
partitioner is `ForwardPartitioner`. Autoscaling found that the performance of
vertex2 is poor, so increase the parallelism of vertex2 from 200 to 300, and
the parallelism of vertex1 doesn't be changed.
If using the `RescalePartitioner`, there are 100 subtasks of vertex1 will
send data to one downstream subtask, and the rest 100 subtasks of vertex1 will
send data to 2 downstream subtasks.
The first half of the source may still have lag.
Not sure if it will happen.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]