huyuanfeng2018 commented on code in PR #879:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/879#discussion_r1753046204


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -191,16 +200,29 @@ public ParallelismChange computeScaleTargetParallelism(
         double cappedTargetCapacity = averageTrueProcessingRate * scaleFactor;
         LOG.debug("Capped target processing capacity for {} is {}", vertex, 
cappedTargetCapacity);
 
-        int newParallelism =
+        Tuple2<Integer, Optional<String>> newParallelism =
                 scale(

Review Comment:
   +1,
   ```suggestion
           int newParallelism =
               scale(
                   vertex,
                   currentParallelism,
                   inputShipStrategies,
                   (int) evaluatedMetrics.get(NUM_PARTITIONS).getCurrent(),
                   (int) evaluatedMetrics.get(MAX_PARALLELISM).getCurrent(),
                   scaleFactor,
                   Math.min(currentParallelism, 
conf.getInteger(VERTEX_MIN_PARALLELISM)),
                   Math.max(currentParallelism, 
conf.getInteger(VERTEX_MAX_PARALLELISM)),
                   scalingLimitedMsg -> autoScalerEventHandler.handleEvent(
                       context,
                       AutoScalerEventHandler.Type.Warning,
                       SCALING_LIMITED,
                       scalingLimitedMsg,
                       SCALING_LIMITED + vertex + cappedTargetCapacity,
                       conf.get(SCALING_EVENT_INTERVAL)));
   ```
   How about we pass in a function, This looks quite clear and does not require 
changing too many test cases.
   



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -378,28 +405,70 @@ protected static int scale(
 
         // Cap parallelism at either maxParallelism(number of key groups or 
source partitions) or
         // parallelism upper limit
-        final int upperBound = Math.min(maxParallelism, parallelismUpperLimit);
+        int upperBound = Math.min(maxParallelism, parallelismUpperLimit);
 
         // Apply min/max parallelism
         newParallelism = Math.min(Math.max(parallelismLowerLimit, 
newParallelism), upperBound);
 
         var adjustByMaxParallelism =
                 inputShipStrategies.isEmpty() || 
inputShipStrategies.contains(HASH);
         if (!adjustByMaxParallelism) {
-            return newParallelism;
+            return Tuple2.of(newParallelism, Optional.empty());
         }
 
-        // When the shuffle type of vertex inputs contains keyBy or vertex is 
a source, we try to
-        // adjust the parallelism such that it divides the maxParallelism 
without a remainder
-        // => data is evenly spread across subtasks
-        for (int p = newParallelism; p <= maxParallelism / 2 && p <= 
upperBound; p++) {
-            if (maxParallelism % p == 0) {
-                return p;
+        if (numPartitions <= 0) {
+            // When the shuffle type of vertex inputs contains keyBy or vertex 
is a source,
+            // we try to adjust the parallelism such that it divides the 
maxParallelism without a
+            // remainder => data is evenly spread across subtasks
+            for (int p = newParallelism; p <= maxParallelism / 2 && p <= 
upperBound; p++) {
+                if (maxParallelism % p == 0) {
+                    return Tuple2.of(p, Optional.empty());
+                }
+            }
+            // If parallelism adjustment fails, use originally computed 
parallelism
+            return Tuple2.of(newParallelism, Optional.empty());
+        } else {
+
+            // When we know the numPartitions at a vertex,
+            // adjust the parallelism such that it divides the numPartitions 
without a remainder
+            // => Data is evenly distributed among subtasks
+            for (int p = newParallelism; p <= upperBound && p <= 
numPartitions; p++) {
+                if (numPartitions % p == 0) {
+                    return Tuple2.of(p, Optional.empty());
+                }
             }
-        }
 
-        // If parallelism adjustment fails, use originally computed parallelism
-        return newParallelism;
+            // When the degree of parallelism after rounding up cannot be 
evenly divided by source
+            // PartitionCount, Try to find the smallest parallelism that can 
satisfy the current
+            // consumption rate.
+            for (int p = newParallelism; p > parallelismLowerLimit; p--) {
+                if (numPartitions / p > numPartitions / newParallelism) {
+                    if (numPartitions % p != 0) {
+                        p += 1;
+                    }

Review Comment:
   Let me expand on my thoughts on this point:
   
   1. The first thing we need to consider is the p that can be divisible by the 
number of partitions when `p<upperBound` is satisfied.
   
   2. If step 1 fails to obtain the corresponding result, it means that we 
cannot guarantee the even consumption of the number of partitions when rounding 
up (due to uneven consumption, bottlenecks will exist in some tasks, so 
theoretically we get the largest upperBound The processing rate will not 
increase), at this time we consider taking a minimum value that is comparable 
to the current processing rate, satisfying newParallelism / partition = 
p/partition
   
   Here is an example:
    `numPartitions=35 ,newParallelism=20,  upperBound = 30;`
   
   step1 : We start from 20 and go up. Since upperBound = 30, we cannot obtain 
the number of partitions that can be consumed evenly. p=30 and p=20 have no 
change in the consumption rate.
   
   step2:
   Since `35/20 = 1 .... 15`, 20 degrees of parallelism cannot consume Kafka 
evenly, so at this time there will be 15 tasks consuming two partitions,5 task 
consuming two partitions,  so our final result only requires that there are 
tasks consuming two partition, then our processing rate won’t slow down
   
   That is `( numPartitions/p = 2 && numPartitions %n =0 || 
numPartitions/(p-1)=2 && (p-1) % !=0 )`.
   
   So `p+=1` here is caused by the indivisible partition obtained when fetching 
down. It should need +1 to meet the conditions.  `35 / 17 = 2` But eventually 
there will be a task consuming three partitions, so we need to add 17+1, 18 is 
our final result (17 tasks consume 2 partitions each, 1 task consumes one 
partition)
   
   step3:
   If p is already less than `parallelismLowerLimit` during the fetching 
process, we should directly use parallelismLowerLimit as the final degree of 
parallelism
   
   However, the above is all theoretical logic. I am not sure whether this will 
cause some negative effects. For example, the data distribution of the 
partition itself is uneven. Step 2 may aggravate this phenomenon, so I have 
reservations about this part of the logic, another approach is to directly 
return newParallelism
   
   
   
   
   



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -378,28 +405,70 @@ protected static int scale(
 
         // Cap parallelism at either maxParallelism(number of key groups or 
source partitions) or
         // parallelism upper limit
-        final int upperBound = Math.min(maxParallelism, parallelismUpperLimit);
+        int upperBound = Math.min(maxParallelism, parallelismUpperLimit);
 
         // Apply min/max parallelism
         newParallelism = Math.min(Math.max(parallelismLowerLimit, 
newParallelism), upperBound);
 
         var adjustByMaxParallelism =
                 inputShipStrategies.isEmpty() || 
inputShipStrategies.contains(HASH);
         if (!adjustByMaxParallelism) {
-            return newParallelism;
+            return Tuple2.of(newParallelism, Optional.empty());
         }
 
-        // When the shuffle type of vertex inputs contains keyBy or vertex is 
a source, we try to
-        // adjust the parallelism such that it divides the maxParallelism 
without a remainder
-        // => data is evenly spread across subtasks
-        for (int p = newParallelism; p <= maxParallelism / 2 && p <= 
upperBound; p++) {
-            if (maxParallelism % p == 0) {
-                return p;
+        if (numPartitions <= 0) {
+            // When the shuffle type of vertex inputs contains keyBy or vertex 
is a source,
+            // we try to adjust the parallelism such that it divides the 
maxParallelism without a
+            // remainder => data is evenly spread across subtasks
+            for (int p = newParallelism; p <= maxParallelism / 2 && p <= 
upperBound; p++) {
+                if (maxParallelism % p == 0) {
+                    return Tuple2.of(p, Optional.empty());
+                }
+            }
+            // If parallelism adjustment fails, use originally computed 
parallelism
+            return Tuple2.of(newParallelism, Optional.empty());
+        } else {
+
+            // When we know the numPartitions at a vertex,
+            // adjust the parallelism such that it divides the numPartitions 
without a remainder
+            // => Data is evenly distributed among subtasks
+            for (int p = newParallelism; p <= upperBound && p <= 
numPartitions; p++) {
+                if (numPartitions % p == 0) {
+                    return Tuple2.of(p, Optional.empty());
+                }
             }
-        }
 
-        // If parallelism adjustment fails, use originally computed parallelism
-        return newParallelism;
+            // When the degree of parallelism after rounding up cannot be 
evenly divided by source
+            // PartitionCount, Try to find the smallest parallelism that can 
satisfy the current
+            // consumption rate.
+            for (int p = newParallelism; p > parallelismLowerLimit; p--) {
+                if (numPartitions / p > numPartitions / newParallelism) {
+                    if (numPartitions % p != 0) {
+                        p += 1;
+                    }
+                    var message =
+                            String.format(
+                                    SCALE_LIMITED_MESSAGE_FORMAT,
+                                    vertex,
+                                    newParallelism,
+                                    p,
+                                    String.format(
+                                            "numPartitions : 
%s,upperBound(maxParallelism or "
+                                                    + "parallelismUpperLimit): 
%s",
+                                            numPartitions, upperBound));
+                    return Tuple2.of(p, Optional.of(message));
+                }
+            }
+            // If a suitable degree of parallelism cannot be found, return 
parallelismLowerLimit
+            var message =
+                    String.format(
+                            SCALE_LIMITED_MESSAGE_FORMAT,
+                            vertex,
+                            newParallelism,
+                            parallelismLowerLimit,
+                            String.format("parallelismLowerLimit : %s", 
parallelismLowerLimit));
+            return Tuple2.of(parallelismLowerLimit, Optional.of(message));

Review Comment:
   I responded with the specific logic, but I think it's still worth discussing
   
   



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -378,28 +405,70 @@ protected static int scale(
 
         // Cap parallelism at either maxParallelism(number of key groups or 
source partitions) or
         // parallelism upper limit
-        final int upperBound = Math.min(maxParallelism, parallelismUpperLimit);
+        int upperBound = Math.min(maxParallelism, parallelismUpperLimit);
 
         // Apply min/max parallelism
         newParallelism = Math.min(Math.max(parallelismLowerLimit, 
newParallelism), upperBound);
 
         var adjustByMaxParallelism =
                 inputShipStrategies.isEmpty() || 
inputShipStrategies.contains(HASH);
         if (!adjustByMaxParallelism) {
-            return newParallelism;
+            return Tuple2.of(newParallelism, Optional.empty());
         }
 
-        // When the shuffle type of vertex inputs contains keyBy or vertex is 
a source, we try to
-        // adjust the parallelism such that it divides the maxParallelism 
without a remainder
-        // => data is evenly spread across subtasks
-        for (int p = newParallelism; p <= maxParallelism / 2 && p <= 
upperBound; p++) {
-            if (maxParallelism % p == 0) {
-                return p;
+        if (numPartitions <= 0) {
+            // When the shuffle type of vertex inputs contains keyBy or vertex 
is a source,
+            // we try to adjust the parallelism such that it divides the 
maxParallelism without a
+            // remainder => data is evenly spread across subtasks
+            for (int p = newParallelism; p <= maxParallelism / 2 && p <= 
upperBound; p++) {
+                if (maxParallelism % p == 0) {
+                    return Tuple2.of(p, Optional.empty());
+                }
+            }
+            // If parallelism adjustment fails, use originally computed 
parallelism
+            return Tuple2.of(newParallelism, Optional.empty());
+        } else {
+
+            // When we know the numPartitions at a vertex,
+            // adjust the parallelism such that it divides the numPartitions 
without a remainder
+            // => Data is evenly distributed among subtasks
+            for (int p = newParallelism; p <= upperBound && p <= 
numPartitions; p++) {
+                if (numPartitions % p == 0) {
+                    return Tuple2.of(p, Optional.empty());
+                }
             }

Review Comment:
   with respect to `p <= maxParallelism / 2 `
   
   When dealing with inputShipStrategies = hash, maxParallelism = 128, 
newParallelism = 78, I think newParallelism = 78 is acceptable, because not all 
tasks have a large state after keyby, 
   
   But for consuming kafka's vertex, this becomes unacceptable
   Imagine that Kafka with 128 partitions is consumed concurrently by 78 task  
:)
   
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to