1996fanrui commented on code in PR #879:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/879#discussion_r1766114467


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java:
##########
@@ -389,15 +407,61 @@ protected static int scale(
             return newParallelism;
         }
 
-        // When the shuffle type of vertex inputs contains keyBy or vertex is 
a source, we try to
-        // adjust the parallelism such that it divides the maxParallelism 
without a remainder
-        // => data is evenly spread across subtasks
-        for (int p = newParallelism; p <= maxParallelism / 2 && p <= 
upperBound; p++) {
-            if (maxParallelism % p == 0) {
+        int numKeyGroupsOrPartitions = maxParallelism;
+        int upperBoundForAlignment;
+        if (numPartitions <= 0) {
+            upperBoundForAlignment = Math.min(maxParallelism / 2, upperBound);

Review Comment:
   Hey @mxm , sorry for the late reply. Because there are too many comments, I 
missed this one.
   
   > @1996fanrui The code path you are pointing to hasn't been changed by this 
PR. It has merely been refactored. In the scenario of maxParallelism=100 and 
newParallelism=80, the resulting parallelism would always be 80.
   > 
   > Are you asking to expand the source logic introduced here to hash keyed 
state?
   
   I'm asking hash keyed state, I don't know why we recommend the result is 80 
instead of 100 for hash keyed state case. But I think you have answered my 
question in this comment.
   
   > > For this scenario, there is no difference when the parallelism is set to 
50 and 99.
   > 
   > That depends on the amount of state in the key groups and other factors 
like hot keys. 50 could be the same as 99, it could also be much worse. For 
sources, the problem is much more amplified because they usually have pretty 
evenly balanced partitions and there is extra overhead to fetch the partition 
data.
   
   Good point! Hot keys may happen in the flink job. In general, Source 
partition without data skew.
    
   > However, when `parallelism > maxParallelism / 2`, this logic is flawed 
because maxParallelism itself could be a possible divisor. We should really be 
going up to `maxParallelism` for the initial `parallelism > maxParallelism / 
2`. We could just skip this (premature) optimization entirely.
   
   This is exactly my question, I think we should use `maxParallelism` as the 
final parallelism. (For the above example, 100 instead of 80).
   
   > ```java
   >     upperBoundForAlignment = Math.min(
   >         // Optimize the case where newParallelism <= maxParallelism / 2
   >         newParallelism > maxParallelism / 2 ? maxParallelism : 
maxParallelism / 2, 
   >         upperBound
   >     );
   > ```
   
   Great, this part solved my concern. thank you~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to