huyuanfeng2018 commented on code in PR #879: URL: https://github.com/apache/flink-kubernetes-operator/pull/879#discussion_r1764464407
########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java: ########## @@ -383,23 +403,68 @@ protected static int scale( // Apply min/max parallelism newParallelism = Math.min(Math.max(parallelismLowerLimit, newParallelism), upperBound); - var adjustByMaxParallelism = - inputShipStrategies.isEmpty() || inputShipStrategies.contains(HASH); - if (!adjustByMaxParallelism) { + var adjustByMaxParallelismOrPartitions = + numSourcePartitions > 0 || inputShipStrategies.contains(HASH); + if (!adjustByMaxParallelismOrPartitions) { return newParallelism; } - // When the shuffle type of vertex inputs contains keyBy or vertex is a source, we try to - // adjust the parallelism such that it divides the maxParallelism without a remainder - // => data is evenly spread across subtasks - for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) { - if (maxParallelism % p == 0) { + final int numKeyGroupsOrPartitions; + final int upperBoundForAlignment; + if (numSourcePartitions <= 0) { + numKeyGroupsOrPartitions = maxParallelism; + upperBoundForAlignment = + Math.min( + // Optimize the case where newParallelism <= maxParallelism / 2 + newParallelism > maxParallelism / 2 + ? maxParallelism + : maxParallelism / 2, + upperBound); + } else { + numKeyGroupsOrPartitions = numSourcePartitions; + upperBoundForAlignment = Math.min(numSourcePartitions, upperBound); + } + + // When the shuffle type of vertex inputs contains keyBy or vertex is a source, + // we try to adjust the parallelism such that it divides + // the adjustableMaxParallelism without a remainder => data is evenly spread across subtasks + for (int p = newParallelism; p <= upperBoundForAlignment; p++) { + if (numKeyGroupsOrPartitions % p == 0) { return p; } } - // If parallelism adjustment fails, use originally computed parallelism - return newParallelism; + // When adjust the parallelism after rounding up cannot be evenly divided by + // numKeyGroupsOrPartitions, Try to find the smallest parallelism that can satisfy the + // current consumption rate. + int p = newParallelism; + for (; p > 0; p--) { + if (numKeyGroupsOrPartitions / p > numKeyGroupsOrPartitions / newParallelism) { + if (numKeyGroupsOrPartitions % p != 0) { + p++; + } + break; + } + } Review Comment: > i found our discussion cannot cover all cases during I review this part in detail. > > For example: sourcePartition is 199, and new parallelism is 99. IIUC, the final parallelism is 67(every subtask consume 3 source partitions, except for the last subtask), right? > > But 100 as the final parallelism makes sense to me(every subtask consume 2 source partitions, except for the last subtask). Well, I actually considered this situation at the very beginning, that is, whether to use the same logic of fetching downwards when fetching upwards, so as to maximize the utilization of resources. But our ultimate goal is to achieve balanced consumption of tasks? This is the healthiest situation. Often max parallelism or partition will have quite a few divisors. You will rarely set partition num to 1, 3, 7, 199, etc., so when I develop, I first think about balance. If It is due to the limitation of upperbound that we will take a reasonable value downwards, and there will be an event happend. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org