1996fanrui commented on code in PR #879: URL: https://github.com/apache/flink-kubernetes-operator/pull/879#discussion_r1764419949
########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java: ########## @@ -383,23 +403,68 @@ protected static int scale( // Apply min/max parallelism newParallelism = Math.min(Math.max(parallelismLowerLimit, newParallelism), upperBound); - var adjustByMaxParallelism = - inputShipStrategies.isEmpty() || inputShipStrategies.contains(HASH); - if (!adjustByMaxParallelism) { + var adjustByMaxParallelismOrPartitions = + numSourcePartitions > 0 || inputShipStrategies.contains(HASH); + if (!adjustByMaxParallelismOrPartitions) { return newParallelism; } - // When the shuffle type of vertex inputs contains keyBy or vertex is a source, we try to - // adjust the parallelism such that it divides the maxParallelism without a remainder - // => data is evenly spread across subtasks - for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) { - if (maxParallelism % p == 0) { + final int numKeyGroupsOrPartitions; + final int upperBoundForAlignment; + if (numSourcePartitions <= 0) { + numKeyGroupsOrPartitions = maxParallelism; + upperBoundForAlignment = + Math.min( + // Optimize the case where newParallelism <= maxParallelism / 2 + newParallelism > maxParallelism / 2 + ? maxParallelism + : maxParallelism / 2, + upperBound); + } else { + numKeyGroupsOrPartitions = numSourcePartitions; + upperBoundForAlignment = Math.min(numSourcePartitions, upperBound); + } + + // When the shuffle type of vertex inputs contains keyBy or vertex is a source, + // we try to adjust the parallelism such that it divides + // the adjustableMaxParallelism without a remainder => data is evenly spread across subtasks + for (int p = newParallelism; p <= upperBoundForAlignment; p++) { + if (numKeyGroupsOrPartitions % p == 0) { return p; } } - // If parallelism adjustment fails, use originally computed parallelism - return newParallelism; + // When adjust the parallelism after rounding up cannot be evenly divided by + // numKeyGroupsOrPartitions, Try to find the smallest parallelism that can satisfy the + // current consumption rate. + int p = newParallelism; + for (; p > 0; p--) { + if (numKeyGroupsOrPartitions / p > numKeyGroupsOrPartitions / newParallelism) { + if (numKeyGroupsOrPartitions % p != 0) { + p++; + } + break; + } + } Review Comment: i found our discussion cannot cover all cases during I review this part in detail. For example: sourcePartition is 199, and new parallelism is 99. IIUC, the final parallelism is 67(every subtask consume 3 source partitions, except for the last subtask), right? But 100 as the final parallelism makes sense to me(every subtask consume 2 source partitions, except for the last subtask). ########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java: ########## @@ -383,23 +403,68 @@ protected static int scale( // Apply min/max parallelism newParallelism = Math.min(Math.max(parallelismLowerLimit, newParallelism), upperBound); - var adjustByMaxParallelism = - inputShipStrategies.isEmpty() || inputShipStrategies.contains(HASH); - if (!adjustByMaxParallelism) { + var adjustByMaxParallelismOrPartitions = + numSourcePartitions > 0 || inputShipStrategies.contains(HASH); + if (!adjustByMaxParallelismOrPartitions) { return newParallelism; } - // When the shuffle type of vertex inputs contains keyBy or vertex is a source, we try to - // adjust the parallelism such that it divides the maxParallelism without a remainder - // => data is evenly spread across subtasks - for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) { - if (maxParallelism % p == 0) { + final int numKeyGroupsOrPartitions; + final int upperBoundForAlignment; + if (numSourcePartitions <= 0) { + numKeyGroupsOrPartitions = maxParallelism; + upperBoundForAlignment = + Math.min( + // Optimize the case where newParallelism <= maxParallelism / 2 + newParallelism > maxParallelism / 2 + ? maxParallelism + : maxParallelism / 2, + upperBound); + } else { + numKeyGroupsOrPartitions = numSourcePartitions; + upperBoundForAlignment = Math.min(numSourcePartitions, upperBound); + } + + // When the shuffle type of vertex inputs contains keyBy or vertex is a source, + // we try to adjust the parallelism such that it divides + // the adjustableMaxParallelism without a remainder => data is evenly spread across subtasks Review Comment: ```suggestion // the numKeyGroupsOrPartitions without a remainder => data is evenly spread across subtasks ``` ########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java: ########## @@ -383,23 +403,68 @@ protected static int scale( // Apply min/max parallelism newParallelism = Math.min(Math.max(parallelismLowerLimit, newParallelism), upperBound); - var adjustByMaxParallelism = - inputShipStrategies.isEmpty() || inputShipStrategies.contains(HASH); - if (!adjustByMaxParallelism) { + var adjustByMaxParallelismOrPartitions = + numSourcePartitions > 0 || inputShipStrategies.contains(HASH); + if (!adjustByMaxParallelismOrPartitions) { return newParallelism; } - // When the shuffle type of vertex inputs contains keyBy or vertex is a source, we try to - // adjust the parallelism such that it divides the maxParallelism without a remainder - // => data is evenly spread across subtasks - for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) { - if (maxParallelism % p == 0) { + final int numKeyGroupsOrPartitions; + final int upperBoundForAlignment; + if (numSourcePartitions <= 0) { + numKeyGroupsOrPartitions = maxParallelism; + upperBoundForAlignment = + Math.min( + // Optimize the case where newParallelism <= maxParallelism / 2 + newParallelism > maxParallelism / 2 + ? maxParallelism + : maxParallelism / 2, + upperBound); + } else { + numKeyGroupsOrPartitions = numSourcePartitions; + upperBoundForAlignment = Math.min(numSourcePartitions, upperBound); + } Review Comment: > Optimize the case where newParallelism <= maxParallelism / 2 Why need this this optimization? Reducing the count of `for loop`? I'm curious why source partition doesn't use this optimization? If both of source and keygroup could use this optimization, does the following code work? ```suggestion var numKeyGroupsOrPartitions = numSourcePartitions <= 0 ? maxParallelism : numSourcePartitions; var upperBoundForAlignment = Math.min( // Optimize the case where newParallelism <= maxParallelism / 2 newParallelism > numKeyGroupsOrPartitions / 2 ? numKeyGroupsOrPartitions : numKeyGroupsOrPartitions / 2, upperBound); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org