1996fanrui commented on code in PR #879: URL: https://github.com/apache/flink-kubernetes-operator/pull/879#discussion_r1759645183
########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java: ########## @@ -389,15 +407,61 @@ protected static int scale( return newParallelism; } - // When the shuffle type of vertex inputs contains keyBy or vertex is a source, we try to - // adjust the parallelism such that it divides the maxParallelism without a remainder - // => data is evenly spread across subtasks - for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) { - if (maxParallelism % p == 0) { + int numKeyGroupsOrPartitions = maxParallelism; + int upperBoundForAlignment; + if (numPartitions <= 0) { + upperBoundForAlignment = Math.min(maxParallelism / 2, upperBound); Review Comment: As I understand, the key group is totally similar with source partition(kafka or plusar). They determine how many partitions or groups a Flink parallelism can consume. The performance is unbalanced even if without large state. For example, the maxParallelism(number of keyGroups) is 100, and the actual parallelism is 70. - It means that 30 instances process 2 keyGroups each, and the remaining 40 instances process 2 keyGroups each. - Assuming that the data of each keyGroup is balanced, the 30 instances processing 2 keyGroups will become the bottleneck of the job. For this scenario, there is no difference when the parallelism is set to 50 and 99. IIUC, this situation is exactly the source partition problem you want to solve, and it works exactly the same for keyGroup as well. Please correct me if anything is wrong. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org