mxm commented on code in PR #879:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/879#discussion_r1757005246


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -389,15 +413,51 @@ protected static int scale(
             return newParallelism;
         }
 
-        // When the shuffle type of vertex inputs contains keyBy or vertex is 
a source, we try to
-        // adjust the parallelism such that it divides the maxParallelism 
without a remainder
-        // => data is evenly spread across subtasks
-        for (int p = newParallelism; p <= maxParallelism / 2 && p <= 
upperBound; p++) {
-            if (maxParallelism % p == 0) {
+        int adjustableMaxParallelism = maxParallelism;
+        int adjustableUpperBound;
+        if (numPartitions <= 0) {
+            adjustableUpperBound = Math.min(maxParallelism / 2, upperBound);
+        } else {
+            adjustableUpperBound = Math.min(numPartitions, upperBound);
+            adjustableMaxParallelism = numPartitions;
+        }
+
+        // When the shuffle type of vertex inputs contains keyBy or vertex is 
a source,
+        // we try to adjust the parallelism such that it divides
+        // the adjustableMaxParallelism without a remainder => data is evenly 
spread across subtasks
+        for (int p = newParallelism; p <= adjustableUpperBound; p++) {
+            if (adjustableMaxParallelism % p == 0) {
                 return p;
             }
         }
 
+        if (numPartitions > 0) {
+
+            // When adjust the parallelism after rounding up cannot be evenly 
divided by source
+            // numPartitions, Try to find the smallest parallelism that can 
satisfy the current
+            // consumption rate.
+            int finalParallelism = newParallelism;
+            for (; finalParallelism > parallelismLowerLimit; 
finalParallelism--) {
+                if (numPartitions / finalParallelism > numPartitions / 
newParallelism) {
+                    if (numPartitions % finalParallelism != 0) {
+                        finalParallelism++;
+                    }
+                    break;
+                }
+            }
+            consumer.accept(
+                    String.format(
+                            SCALE_LIMITED_MESSAGE_FORMAT,
+                            vertex,
+                            newParallelism,
+                            finalParallelism,
+                            String.format(
+                                    "numPartitions : 
%s,upperBound(maxParallelism or parallelismUpperLimit): %s, "
+                                            + "parallelismLowerLimit: %s.",
+                                    numPartitions, upperBound, 
parallelismLowerLimit)));

Review Comment:
   I would prefer if we worked on the EventHandler directly here instead of 
going through the consumer. The reason is that I would like to keep event 
generation code in one place.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to