1996fanrui commented on code in PR #783:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/783#discussion_r1500874435


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -280,11 +283,14 @@ protected static int scale(
         // Apply min/max parallelism
         newParallelism = Math.min(Math.max(minParallelism, newParallelism), 
upperBound);
 
-        // Try to adjust the parallelism such that it divides the number of 
key groups without a
-        // remainder => state is evenly spread across subtasks
-        for (int p = newParallelism; p <= numKeyGroups / 2 && p <= upperBound; 
p++) {
-            if (numKeyGroups % p == 0) {
-                return p;
+        // When the shuffle type of vertex data source contains keyBy, we try 
to adjust the
+        // parallelism such that it divides the number of key groups without a 
remainder =>
+        // data is evenly spread across subtasks
+        if (hasKeyBy) {

Review Comment:
   Hi, @gyfora @mxm . During I test autoscaler, I found the parallelissm 
adjustment should not take effect for non-keyed operator.
   
   It's better to include this optimization in 1.8.0. Would you mind helping 
take a look in your free time? thanks in advanced~
   
   BTW, how about rename  `hasKeyBy` to `isKeyedVertex`?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to