1996fanrui commented on code in PR #879: URL: https://github.com/apache/flink-kubernetes-operator/pull/879#discussion_r1766114467
########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java: ########## @@ -389,15 +407,61 @@ protected static int scale( return newParallelism; } - // When the shuffle type of vertex inputs contains keyBy or vertex is a source, we try to - // adjust the parallelism such that it divides the maxParallelism without a remainder - // => data is evenly spread across subtasks - for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) { - if (maxParallelism % p == 0) { + int numKeyGroupsOrPartitions = maxParallelism; + int upperBoundForAlignment; + if (numPartitions <= 0) { + upperBoundForAlignment = Math.min(maxParallelism / 2, upperBound); Review Comment: Hey @mxm , sorry for the late reply. Because there are too many comments, I missed this one. > @1996fanrui The code path you are pointing to hasn't been changed by this PR. It has merely been refactored. In the scenario of maxParallelism=100 and newParallelism=80, the resulting parallelism would always be 80. > > Are you asking to expand the source logic introduced here to hash keyed state? I'm asking hash keyed state, I don't know why we recommend the result is 80 instead of 100 for hash keyed state case. But I think you have answered my question in this comment. > > For this scenario, there is no difference when the parallelism is set to 50 and 99. > > That depends on the amount of state in the key groups and other factors like hot keys. 50 could be the same as 99, it could also be much worse. For sources, the problem is much more amplified because they usually have pretty evenly balanced partitions and there is extra overhead to fetch the partition data. Good point! Hot keys may happen in the flink job. In general, Source partition without data skew. > However, when `parallelism > maxParallelism / 2`, this logic is flawed because maxParallelism itself could be a possible divisor. We should really be going up to `maxParallelism` for the initial `parallelism > maxParallelism / 2`. We could just skip this (premature) optimization entirely. This is exactly my question, I think we should use `maxParallelism` as the final parallelism. (For the above example, 100 instead of 80). > ```java > upperBoundForAlignment = Math.min( > // Optimize the case where newParallelism <= maxParallelism / 2 > newParallelism > maxParallelism / 2 ? maxParallelism : maxParallelism / 2, > upperBound > ); > ``` Great, this part solved my concern. thank you~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org