1996fanrui commented on code in PR #879:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/879#discussion_r1764419949


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -383,23 +403,68 @@ protected static int scale(
         // Apply min/max parallelism
         newParallelism = Math.min(Math.max(parallelismLowerLimit, 
newParallelism), upperBound);
 
-        var adjustByMaxParallelism =
-                inputShipStrategies.isEmpty() || 
inputShipStrategies.contains(HASH);
-        if (!adjustByMaxParallelism) {
+        var adjustByMaxParallelismOrPartitions =
+                numSourcePartitions > 0 || inputShipStrategies.contains(HASH);
+        if (!adjustByMaxParallelismOrPartitions) {
             return newParallelism;
         }
 
-        // When the shuffle type of vertex inputs contains keyBy or vertex is 
a source, we try to
-        // adjust the parallelism such that it divides the maxParallelism 
without a remainder
-        // => data is evenly spread across subtasks
-        for (int p = newParallelism; p <= maxParallelism / 2 && p <= 
upperBound; p++) {
-            if (maxParallelism % p == 0) {
+        final int numKeyGroupsOrPartitions;
+        final int upperBoundForAlignment;
+        if (numSourcePartitions <= 0) {
+            numKeyGroupsOrPartitions = maxParallelism;
+            upperBoundForAlignment =
+                    Math.min(
+                            // Optimize the case where newParallelism <= 
maxParallelism / 2
+                            newParallelism > maxParallelism / 2
+                                    ? maxParallelism
+                                    : maxParallelism / 2,
+                            upperBound);
+        } else {
+            numKeyGroupsOrPartitions = numSourcePartitions;
+            upperBoundForAlignment = Math.min(numSourcePartitions, upperBound);
+        }
+
+        // When the shuffle type of vertex inputs contains keyBy or vertex is 
a source,
+        // we try to adjust the parallelism such that it divides
+        // the adjustableMaxParallelism without a remainder => data is evenly 
spread across subtasks
+        for (int p = newParallelism; p <= upperBoundForAlignment; p++) {
+            if (numKeyGroupsOrPartitions % p == 0) {
                 return p;
             }
         }
 
-        // If parallelism adjustment fails, use originally computed parallelism
-        return newParallelism;
+        // When adjust the parallelism after rounding up cannot be evenly 
divided by
+        // numKeyGroupsOrPartitions, Try to find the smallest parallelism that 
can satisfy the
+        // current consumption rate.
+        int p = newParallelism;
+        for (; p > 0; p--) {
+            if (numKeyGroupsOrPartitions / p > numKeyGroupsOrPartitions / 
newParallelism) {
+                if (numKeyGroupsOrPartitions % p != 0) {
+                    p++;
+                }
+                break;
+            }
+        }

Review Comment:
   i found our discussion cannot cover all cases during I review this part in 
detail.
   
   For example: sourcePartition is 199, and new parallelism is 99. IIUC, the 
final parallelism is 67(every subtask consume 3 source partitions, except for 
the last subtask), right?
   
   But 100 as the final parallelism makes sense to me(every subtask consume 2 
source partitions, except for the last subtask).



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -383,23 +403,68 @@ protected static int scale(
         // Apply min/max parallelism
         newParallelism = Math.min(Math.max(parallelismLowerLimit, 
newParallelism), upperBound);
 
-        var adjustByMaxParallelism =
-                inputShipStrategies.isEmpty() || 
inputShipStrategies.contains(HASH);
-        if (!adjustByMaxParallelism) {
+        var adjustByMaxParallelismOrPartitions =
+                numSourcePartitions > 0 || inputShipStrategies.contains(HASH);
+        if (!adjustByMaxParallelismOrPartitions) {
             return newParallelism;
         }
 
-        // When the shuffle type of vertex inputs contains keyBy or vertex is 
a source, we try to
-        // adjust the parallelism such that it divides the maxParallelism 
without a remainder
-        // => data is evenly spread across subtasks
-        for (int p = newParallelism; p <= maxParallelism / 2 && p <= 
upperBound; p++) {
-            if (maxParallelism % p == 0) {
+        final int numKeyGroupsOrPartitions;
+        final int upperBoundForAlignment;
+        if (numSourcePartitions <= 0) {
+            numKeyGroupsOrPartitions = maxParallelism;
+            upperBoundForAlignment =
+                    Math.min(
+                            // Optimize the case where newParallelism <= 
maxParallelism / 2
+                            newParallelism > maxParallelism / 2
+                                    ? maxParallelism
+                                    : maxParallelism / 2,
+                            upperBound);
+        } else {
+            numKeyGroupsOrPartitions = numSourcePartitions;
+            upperBoundForAlignment = Math.min(numSourcePartitions, upperBound);
+        }
+
+        // When the shuffle type of vertex inputs contains keyBy or vertex is 
a source,
+        // we try to adjust the parallelism such that it divides
+        // the adjustableMaxParallelism without a remainder => data is evenly 
spread across subtasks

Review Comment:
   ```suggestion
           // the numKeyGroupsOrPartitions without a remainder => data is 
evenly spread across subtasks
   ```



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -383,23 +403,68 @@ protected static int scale(
         // Apply min/max parallelism
         newParallelism = Math.min(Math.max(parallelismLowerLimit, 
newParallelism), upperBound);
 
-        var adjustByMaxParallelism =
-                inputShipStrategies.isEmpty() || 
inputShipStrategies.contains(HASH);
-        if (!adjustByMaxParallelism) {
+        var adjustByMaxParallelismOrPartitions =
+                numSourcePartitions > 0 || inputShipStrategies.contains(HASH);
+        if (!adjustByMaxParallelismOrPartitions) {
             return newParallelism;
         }
 
-        // When the shuffle type of vertex inputs contains keyBy or vertex is 
a source, we try to
-        // adjust the parallelism such that it divides the maxParallelism 
without a remainder
-        // => data is evenly spread across subtasks
-        for (int p = newParallelism; p <= maxParallelism / 2 && p <= 
upperBound; p++) {
-            if (maxParallelism % p == 0) {
+        final int numKeyGroupsOrPartitions;
+        final int upperBoundForAlignment;
+        if (numSourcePartitions <= 0) {
+            numKeyGroupsOrPartitions = maxParallelism;
+            upperBoundForAlignment =
+                    Math.min(
+                            // Optimize the case where newParallelism <= 
maxParallelism / 2
+                            newParallelism > maxParallelism / 2
+                                    ? maxParallelism
+                                    : maxParallelism / 2,
+                            upperBound);
+        } else {
+            numKeyGroupsOrPartitions = numSourcePartitions;
+            upperBoundForAlignment = Math.min(numSourcePartitions, upperBound);
+        }

Review Comment:
   >  Optimize the case where newParallelism <= maxParallelism / 2
   
   Why need this this optimization? Reducing the count of `for loop`?
   
   I'm curious why source partition doesn't use this optimization? If both of 
source and keygroup could use this optimization, does the following code work?
   
   ```suggestion
           var numKeyGroupsOrPartitions = numSourcePartitions <= 0 ? 
maxParallelism : numSourcePartitions;
           var upperBoundForAlignment = 
                       Math.min(
                               // Optimize the case where newParallelism <= 
maxParallelism / 2
                               newParallelism > numKeyGroupsOrPartitions / 2
                                       ? numKeyGroupsOrPartitions
                                       : numKeyGroupsOrPartitions / 2,
                               upperBound);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to