mxm commented on code in PR #879: URL: https://github.com/apache/flink-kubernetes-operator/pull/879#discussion_r1756999617
########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java: ########## @@ -378,28 +405,70 @@ protected static int scale( // Cap parallelism at either maxParallelism(number of key groups or source partitions) or // parallelism upper limit - final int upperBound = Math.min(maxParallelism, parallelismUpperLimit); + int upperBound = Math.min(maxParallelism, parallelismUpperLimit); // Apply min/max parallelism newParallelism = Math.min(Math.max(parallelismLowerLimit, newParallelism), upperBound); var adjustByMaxParallelism = inputShipStrategies.isEmpty() || inputShipStrategies.contains(HASH); if (!adjustByMaxParallelism) { - return newParallelism; + return Tuple2.of(newParallelism, Optional.empty()); } - // When the shuffle type of vertex inputs contains keyBy or vertex is a source, we try to - // adjust the parallelism such that it divides the maxParallelism without a remainder - // => data is evenly spread across subtasks - for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) { - if (maxParallelism % p == 0) { - return p; + if (numPartitions <= 0) { + // When the shuffle type of vertex inputs contains keyBy or vertex is a source, + // we try to adjust the parallelism such that it divides the maxParallelism without a + // remainder => data is evenly spread across subtasks + for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) { + if (maxParallelism % p == 0) { + return Tuple2.of(p, Optional.empty()); + } + } + // If parallelism adjustment fails, use originally computed parallelism + return Tuple2.of(newParallelism, Optional.empty()); + } else { + + // When we know the numPartitions at a vertex, + // adjust the parallelism such that it divides the numPartitions without a remainder + // => Data is evenly distributed among subtasks + for (int p = newParallelism; p <= upperBound && p <= numPartitions; p++) { + if (numPartitions % p == 0) { + return Tuple2.of(p, Optional.empty()); + } } Review Comment: Can we call them like this? 1. adjustableMaxParallelism => numKeyGroupsOrPartitions 2. adjustableUpperBound => upperBoundForAlignment Adjustable just doesn't tell someone who is unfamiliar with the code very much. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org