huyuanfeng2018 commented on code in PR #879:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/879#discussion_r1758427800


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -345,15 +356,22 @@ private boolean detectIneffectiveScaleUp(
      * <p>Also, in order to ensure the data is evenly spread across subtasks, 
we try to adjust the
      * parallelism for source and keyed vertex such that it divides the 
maxParallelism without a
      * remainder.
+     *
+     * <p>This method also attempts to adjust the parallelism to ensure it 
aligns well with the
+     * number of partitions if a vertex has a known partition count.

Review Comment:
   agree



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -389,15 +407,61 @@ protected static int scale(
             return newParallelism;
         }
 
-        // When the shuffle type of vertex inputs contains keyBy or vertex is 
a source, we try to
-        // adjust the parallelism such that it divides the maxParallelism 
without a remainder
-        // => data is evenly spread across subtasks
-        for (int p = newParallelism; p <= maxParallelism / 2 && p <= 
upperBound; p++) {
-            if (maxParallelism % p == 0) {
+        int numKeyGroupsOrPartitions = maxParallelism;
+        int upperBoundForAlignment;
+        if (numPartitions <= 0) {
+            upperBoundForAlignment = Math.min(maxParallelism / 2, upperBound);
+        } else {
+            upperBoundForAlignment = Math.min(numPartitions, upperBound);
+            numKeyGroupsOrPartitions = numPartitions;
+        }
+
+        // When the shuffle type of vertex inputs contains keyBy or vertex is 
a source,
+        // we try to adjust the parallelism such that it divides
+        // the adjustableMaxParallelism without a remainder => data is evenly 
spread across subtasks
+        for (int p = newParallelism; p <= upperBoundForAlignment; p++) {
+            if (numKeyGroupsOrPartitions % p == 0) {
                 return p;
             }
         }
 
+        if (numPartitions > 0) {
+
+            // When adjust the parallelism after rounding up cannot be evenly 
divided by source
+            // numPartitions, Try to find the smallest parallelism that can 
satisfy the current
+            // consumption rate.
+            int p = newParallelism;
+            for (; p > 0; p--) {
+                if (numPartitions / p > numPartitions / newParallelism) {
+                    if (numPartitions % p != 0) {
+                        p++;
+                    }
+                    break;
+                }
+            }
+
+            p = Math.max(p, parallelismLowerLimit);
+
+            var message =
+                    String.format(
+                            SCALE_LIMITED_MESSAGE_FORMAT,
+                            vertex,
+                            newParallelism,
+                            p,
+                            String.format(
+                                    "numPartitions : 
%s,upperBoundForAlignment(maxParallelism or parallelismUpperLimit): %s, "
+                                            + "parallelismLowerLimit: %s.",
+                                    numPartitions, upperBound, 
parallelismLowerLimit));

Review Comment:
   You are right.
   
   



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -389,15 +407,61 @@ protected static int scale(
             return newParallelism;

Review Comment:
   Not necessarily, Source Vertex may also have keyedstate, and it should be 
matched with MaxParallelism.
   
   



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -389,15 +407,61 @@ protected static int scale(
             return newParallelism;
         }
 
-        // When the shuffle type of vertex inputs contains keyBy or vertex is 
a source, we try to
-        // adjust the parallelism such that it divides the maxParallelism 
without a remainder
-        // => data is evenly spread across subtasks
-        for (int p = newParallelism; p <= maxParallelism / 2 && p <= 
upperBound; p++) {
-            if (maxParallelism % p == 0) {
+        int numKeyGroupsOrPartitions = maxParallelism;
+        int upperBoundForAlignment;
+        if (numPartitions <= 0) {
+            upperBoundForAlignment = Math.min(maxParallelism / 2, upperBound);
+        } else {
+            upperBoundForAlignment = Math.min(numPartitions, upperBound);

Review Comment:
   upperBound is already the smaller value obtained by comparing 
parallelismUpperLimit and maxParallelism.
   
   



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -389,15 +407,61 @@ protected static int scale(
             return newParallelism;
         }
 
-        // When the shuffle type of vertex inputs contains keyBy or vertex is 
a source, we try to
-        // adjust the parallelism such that it divides the maxParallelism 
without a remainder
-        // => data is evenly spread across subtasks
-        for (int p = newParallelism; p <= maxParallelism / 2 && p <= 
upperBound; p++) {
-            if (maxParallelism % p == 0) {
+        int numKeyGroupsOrPartitions = maxParallelism;
+        int upperBoundForAlignment;
+        if (numPartitions <= 0) {
+            upperBoundForAlignment = Math.min(maxParallelism / 2, upperBound);

Review Comment:
   yes, but I think not all tasks after hash are in large flink keyedState. In 
this scenario, miss alignment has almost no impact on the task. I think this is 
the trade-off between performance and resources that was taken into 
consideration by the previous logic. 
   
   But this becomes less acceptable for consuming kafka or pulsar
   
   @mxm @1996fanrui Maybe we can discuss this logic further
   
   
   
   
   
   
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to