1996fanrui commented on code in PR #783:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/783#discussion_r1500874435
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -280,11 +283,14 @@ protected static int scale(
// Apply min/max parallelism
newParallelism = Math.min(Math.max(minParallelism, newParallelism),
upperBound);
- // Try to adjust the parallelism such that it divides the number of
key groups without a
- // remainder => state is evenly spread across subtasks
- for (int p = newParallelism; p <= numKeyGroups / 2 && p <= upperBound;
p++) {
- if (numKeyGroups % p == 0) {
- return p;
+ // When the shuffle type of vertex data source contains keyBy, we try
to adjust the
+ // parallelism such that it divides the number of key groups without a
remainder =>
+ // data is evenly spread across subtasks
+ if (hasKeyBy) {
Review Comment:
Hi, @gyfora @mxm . During I test autoscaler, I found the parallelissm
adjustment should not take effect for non-keyed operator.
It's better to include this optimization in 1.8.0. Would you mind helping
take a look in your free time? thanks in advanced~
BTW, how about rename `hasKeyBy` to `isKeyedVertex`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]