mxm commented on code in PR #879:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/879#discussion_r1754969067


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -378,28 +405,70 @@ protected static int scale(
 
         // Cap parallelism at either maxParallelism(number of key groups or 
source partitions) or
         // parallelism upper limit
-        final int upperBound = Math.min(maxParallelism, parallelismUpperLimit);
+        int upperBound = Math.min(maxParallelism, parallelismUpperLimit);
 
         // Apply min/max parallelism
         newParallelism = Math.min(Math.max(parallelismLowerLimit, 
newParallelism), upperBound);
 
         var adjustByMaxParallelism =
                 inputShipStrategies.isEmpty() || 
inputShipStrategies.contains(HASH);
         if (!adjustByMaxParallelism) {
-            return newParallelism;
+            return Tuple2.of(newParallelism, Optional.empty());
         }
 
-        // When the shuffle type of vertex inputs contains keyBy or vertex is 
a source, we try to
-        // adjust the parallelism such that it divides the maxParallelism 
without a remainder
-        // => data is evenly spread across subtasks
-        for (int p = newParallelism; p <= maxParallelism / 2 && p <= 
upperBound; p++) {
-            if (maxParallelism % p == 0) {
-                return p;
+        if (numPartitions <= 0) {
+            // When the shuffle type of vertex inputs contains keyBy or vertex 
is a source,
+            // we try to adjust the parallelism such that it divides the 
maxParallelism without a
+            // remainder => data is evenly spread across subtasks
+            for (int p = newParallelism; p <= maxParallelism / 2 && p <= 
upperBound; p++) {
+                if (maxParallelism % p == 0) {
+                    return Tuple2.of(p, Optional.empty());
+                }
+            }
+            // If parallelism adjustment fails, use originally computed 
parallelism
+            return Tuple2.of(newParallelism, Optional.empty());
+        } else {
+
+            // When we know the numPartitions at a vertex,
+            // adjust the parallelism such that it divides the numPartitions 
without a remainder
+            // => Data is evenly distributed among subtasks
+            for (int p = newParallelism; p <= upperBound && p <= 
numPartitions; p++) {
+                if (numPartitions % p == 0) {
+                    return Tuple2.of(p, Optional.empty());
+                }
             }
-        }
 
-        // If parallelism adjustment fails, use originally computed parallelism
-        return newParallelism;
+            // When the degree of parallelism after rounding up cannot be 
evenly divided by source
+            // PartitionCount, Try to find the smallest parallelism that can 
satisfy the current
+            // consumption rate.
+            for (int p = newParallelism; p > parallelismLowerLimit; p--) {
+                if (numPartitions / p > numPartitions / newParallelism) {
+                    if (numPartitions % p != 0) {
+                        p += 1;
+                    }

Review Comment:
   Thanks for explaining in detail. I misread some of the code. It is correct 
that we need to add +1 when we have found a parallelism which yields a greater 
value for `num_partitions / p` than the initial `num_partitions / 
new_parallelism` because we have found the tipping point where we achieve the 
most utilization in terms of partitions per task.
   
   I think we should return `new_parallelism` if all adaptation logic fails 
because using a potentially very small configured lower parallelism could make 
things a lot worse due to resource constraints.



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -191,16 +200,29 @@ public ParallelismChange computeScaleTargetParallelism(
         double cappedTargetCapacity = averageTrueProcessingRate * scaleFactor;
         LOG.debug("Capped target processing capacity for {} is {}", vertex, 
cappedTargetCapacity);
 
-        int newParallelism =
+        Tuple2<Integer, Optional<String>> newParallelism =
                 scale(

Review Comment:
   Fine with me. Alternatively, for tests, we could also pass in the test 
implementation of the event handler which allows to inspect the generated 
events.



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -378,28 +405,70 @@ protected static int scale(
 
         // Cap parallelism at either maxParallelism(number of key groups or 
source partitions) or
         // parallelism upper limit
-        final int upperBound = Math.min(maxParallelism, parallelismUpperLimit);
+        int upperBound = Math.min(maxParallelism, parallelismUpperLimit);
 
         // Apply min/max parallelism
         newParallelism = Math.min(Math.max(parallelismLowerLimit, 
newParallelism), upperBound);
 
         var adjustByMaxParallelism =
                 inputShipStrategies.isEmpty() || 
inputShipStrategies.contains(HASH);
         if (!adjustByMaxParallelism) {
-            return newParallelism;
+            return Tuple2.of(newParallelism, Optional.empty());
         }
 
-        // When the shuffle type of vertex inputs contains keyBy or vertex is 
a source, we try to
-        // adjust the parallelism such that it divides the maxParallelism 
without a remainder
-        // => data is evenly spread across subtasks
-        for (int p = newParallelism; p <= maxParallelism / 2 && p <= 
upperBound; p++) {
-            if (maxParallelism % p == 0) {
-                return p;
+        if (numPartitions <= 0) {
+            // When the shuffle type of vertex inputs contains keyBy or vertex 
is a source,
+            // we try to adjust the parallelism such that it divides the 
maxParallelism without a
+            // remainder => data is evenly spread across subtasks
+            for (int p = newParallelism; p <= maxParallelism / 2 && p <= 
upperBound; p++) {
+                if (maxParallelism % p == 0) {
+                    return Tuple2.of(p, Optional.empty());
+                }
+            }
+            // If parallelism adjustment fails, use originally computed 
parallelism
+            return Tuple2.of(newParallelism, Optional.empty());
+        } else {
+
+            // When we know the numPartitions at a vertex,
+            // adjust the parallelism such that it divides the numPartitions 
without a remainder
+            // => Data is evenly distributed among subtasks
+            for (int p = newParallelism; p <= upperBound && p <= 
numPartitions; p++) {
+                if (numPartitions % p == 0) {
+                    return Tuple2.of(p, Optional.empty());
+                }
             }

Review Comment:
   Right, I missed that. I was trying to generalize the two code blocks. How 
about the following?
   
   ```suggestion
           if (numPartitions <= 0) {
               upperBound = Math.min(maxParallelism / 2, upperBound);
           } else {
               upperBound = Math.min(num_partitions, upperBound);
               maxParallelism = num_partitions;
           }
          for (int p = newParallelism; p <= upperBound; p++) {
               if (maxParallelism % p == 0) {
                   return Tuple2.of(p, Optional.empty());
               }
           }
          ...
          // Resource optimization logic follows (if we can't achieve optimal 
partitioning)
          // (See review comment below)
          ...
          // If parallelism adjustment fails, use originally computed 
parallelism
          return Tuple2.of(newParallelism, Optional.empty());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to