mxm commented on code in PR #879:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/879#discussion_r1756999617


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -378,28 +405,70 @@ protected static int scale(
 
         // Cap parallelism at either maxParallelism(number of key groups or 
source partitions) or
         // parallelism upper limit
-        final int upperBound = Math.min(maxParallelism, parallelismUpperLimit);
+        int upperBound = Math.min(maxParallelism, parallelismUpperLimit);
 
         // Apply min/max parallelism
         newParallelism = Math.min(Math.max(parallelismLowerLimit, 
newParallelism), upperBound);
 
         var adjustByMaxParallelism =
                 inputShipStrategies.isEmpty() || 
inputShipStrategies.contains(HASH);
         if (!adjustByMaxParallelism) {
-            return newParallelism;
+            return Tuple2.of(newParallelism, Optional.empty());
         }
 
-        // When the shuffle type of vertex inputs contains keyBy or vertex is 
a source, we try to
-        // adjust the parallelism such that it divides the maxParallelism 
without a remainder
-        // => data is evenly spread across subtasks
-        for (int p = newParallelism; p <= maxParallelism / 2 && p <= 
upperBound; p++) {
-            if (maxParallelism % p == 0) {
-                return p;
+        if (numPartitions <= 0) {
+            // When the shuffle type of vertex inputs contains keyBy or vertex 
is a source,
+            // we try to adjust the parallelism such that it divides the 
maxParallelism without a
+            // remainder => data is evenly spread across subtasks
+            for (int p = newParallelism; p <= maxParallelism / 2 && p <= 
upperBound; p++) {
+                if (maxParallelism % p == 0) {
+                    return Tuple2.of(p, Optional.empty());
+                }
+            }
+            // If parallelism adjustment fails, use originally computed 
parallelism
+            return Tuple2.of(newParallelism, Optional.empty());
+        } else {
+
+            // When we know the numPartitions at a vertex,
+            // adjust the parallelism such that it divides the numPartitions 
without a remainder
+            // => Data is evenly distributed among subtasks
+            for (int p = newParallelism; p <= upperBound && p <= 
numPartitions; p++) {
+                if (numPartitions % p == 0) {
+                    return Tuple2.of(p, Optional.empty());
+                }
             }

Review Comment:
   Can we call them like this?
   
   1. adjustableMaxParallelism => numKeyGroupsOrPartitions
   2. adjustableUpperBound =>  upperBoundForAlignment  
   
   Adjustable just doesn't tell someone who is unfamiliar with the code very 
much.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to