huyuanfeng2018 commented on code in PR #879:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/879#discussion_r1764464407


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -383,23 +403,68 @@ protected static int scale(
         // Apply min/max parallelism
         newParallelism = Math.min(Math.max(parallelismLowerLimit, 
newParallelism), upperBound);
 
-        var adjustByMaxParallelism =
-                inputShipStrategies.isEmpty() || 
inputShipStrategies.contains(HASH);
-        if (!adjustByMaxParallelism) {
+        var adjustByMaxParallelismOrPartitions =
+                numSourcePartitions > 0 || inputShipStrategies.contains(HASH);
+        if (!adjustByMaxParallelismOrPartitions) {
             return newParallelism;
         }
 
-        // When the shuffle type of vertex inputs contains keyBy or vertex is 
a source, we try to
-        // adjust the parallelism such that it divides the maxParallelism 
without a remainder
-        // => data is evenly spread across subtasks
-        for (int p = newParallelism; p <= maxParallelism / 2 && p <= 
upperBound; p++) {
-            if (maxParallelism % p == 0) {
+        final int numKeyGroupsOrPartitions;
+        final int upperBoundForAlignment;
+        if (numSourcePartitions <= 0) {
+            numKeyGroupsOrPartitions = maxParallelism;
+            upperBoundForAlignment =
+                    Math.min(
+                            // Optimize the case where newParallelism <= 
maxParallelism / 2
+                            newParallelism > maxParallelism / 2
+                                    ? maxParallelism
+                                    : maxParallelism / 2,
+                            upperBound);
+        } else {
+            numKeyGroupsOrPartitions = numSourcePartitions;
+            upperBoundForAlignment = Math.min(numSourcePartitions, upperBound);
+        }
+
+        // When the shuffle type of vertex inputs contains keyBy or vertex is 
a source,
+        // we try to adjust the parallelism such that it divides
+        // the adjustableMaxParallelism without a remainder => data is evenly 
spread across subtasks
+        for (int p = newParallelism; p <= upperBoundForAlignment; p++) {
+            if (numKeyGroupsOrPartitions % p == 0) {
                 return p;
             }
         }
 
-        // If parallelism adjustment fails, use originally computed parallelism
-        return newParallelism;
+        // When adjust the parallelism after rounding up cannot be evenly 
divided by
+        // numKeyGroupsOrPartitions, Try to find the smallest parallelism that 
can satisfy the
+        // current consumption rate.
+        int p = newParallelism;
+        for (; p > 0; p--) {
+            if (numKeyGroupsOrPartitions / p > numKeyGroupsOrPartitions / 
newParallelism) {
+                if (numKeyGroupsOrPartitions % p != 0) {
+                    p++;
+                }
+                break;
+            }
+        }

Review Comment:
   > i found our discussion cannot cover all cases during I review this part in 
detail.
   > 
   > For example: sourcePartition is 199, and new parallelism is 99. IIUC, the 
final parallelism is 67(every subtask consume 3 source partitions, except for 
the last subtask), right?
   > 
   > But 100 as the final parallelism makes sense to me(every subtask consume 2 
source partitions, except for the last subtask).
   
   Well, I actually considered this situation at the very beginning, that is, 
whether to use the same logic of fetching downwards when fetching upwards, so 
as to maximize the utilization of resources.
   
   But our ultimate goal is to achieve balanced consumption of tasks? This is 
the healthiest situation. Often max parallelism or partition will have quite a 
few divisors. You will rarely set partition num to 1, 3, 7, 199, etc., so when 
I develop, I first think about balance. If It is due to the limitation of 
upperbound that we will take a reasonable value downwards, and there will be an 
event happend.
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to