huyuanfeng2018 commented on code in PR #879: URL: https://github.com/apache/flink-kubernetes-operator/pull/879#discussion_r1756220655
########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java: ########## @@ -378,28 +405,70 @@ protected static int scale( // Cap parallelism at either maxParallelism(number of key groups or source partitions) or // parallelism upper limit - final int upperBound = Math.min(maxParallelism, parallelismUpperLimit); + int upperBound = Math.min(maxParallelism, parallelismUpperLimit); // Apply min/max parallelism newParallelism = Math.min(Math.max(parallelismLowerLimit, newParallelism), upperBound); var adjustByMaxParallelism = inputShipStrategies.isEmpty() || inputShipStrategies.contains(HASH); if (!adjustByMaxParallelism) { - return newParallelism; + return Tuple2.of(newParallelism, Optional.empty()); } - // When the shuffle type of vertex inputs contains keyBy or vertex is a source, we try to - // adjust the parallelism such that it divides the maxParallelism without a remainder - // => data is evenly spread across subtasks - for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) { - if (maxParallelism % p == 0) { - return p; + if (numPartitions <= 0) { + // When the shuffle type of vertex inputs contains keyBy or vertex is a source, + // we try to adjust the parallelism such that it divides the maxParallelism without a + // remainder => data is evenly spread across subtasks + for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) { + if (maxParallelism % p == 0) { + return Tuple2.of(p, Optional.empty()); + } + } + // If parallelism adjustment fails, use originally computed parallelism + return Tuple2.of(newParallelism, Optional.empty()); + } else { + + // When we know the numPartitions at a vertex, + // adjust the parallelism such that it divides the numPartitions without a remainder + // => Data is evenly distributed among subtasks + for (int p = newParallelism; p <= upperBound && p <= numPartitions; p++) { + if (numPartitions % p == 0) { + return Tuple2.of(p, Optional.empty()); + } } - } - // If parallelism adjustment fails, use originally computed parallelism - return newParallelism; + // When the degree of parallelism after rounding up cannot be evenly divided by source + // PartitionCount, Try to find the smallest parallelism that can satisfy the current + // consumption rate. + for (int p = newParallelism; p > parallelismLowerLimit; p--) { + if (numPartitions / p > numPartitions / newParallelism) { + if (numPartitions % p != 0) { + p += 1; + } Review Comment: > Thanks for explaining in detail. I misread some of the code. It is correct that we need to add +1 when we have found a parallelism which yields a greater value for `num_partitions / p` than the initial `num_partitions / new_parallelism` because we have found the tipping point where we achieve the most utilization in terms of partitions per task. > > I think we should return `new_parallelism` if all adaptation logic fails because using a potentially very small configured lower parallelism could make things a lot worse due to resource constraints. I want to explain the reason for using `parallelismLowerLimit`, example: `numPartitions=35 ,newParallelism=24, upperBound = 30, parallelismLowerLimit = 19` Step1 cannot get a result, so it goes to step2, but step2 still cannot get a result because parallelismLowerLimit = 19 and the expected value of step2 is 18, so it will eventually return 19 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org