huyuanfeng2018 commented on code in PR #879: URL: https://github.com/apache/flink-kubernetes-operator/pull/879#discussion_r1758427800
########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java: ########## @@ -345,15 +356,22 @@ private boolean detectIneffectiveScaleUp( * <p>Also, in order to ensure the data is evenly spread across subtasks, we try to adjust the * parallelism for source and keyed vertex such that it divides the maxParallelism without a * remainder. + * + * <p>This method also attempts to adjust the parallelism to ensure it aligns well with the + * number of partitions if a vertex has a known partition count. Review Comment: agree ########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java: ########## @@ -389,15 +407,61 @@ protected static int scale( return newParallelism; } - // When the shuffle type of vertex inputs contains keyBy or vertex is a source, we try to - // adjust the parallelism such that it divides the maxParallelism without a remainder - // => data is evenly spread across subtasks - for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) { - if (maxParallelism % p == 0) { + int numKeyGroupsOrPartitions = maxParallelism; + int upperBoundForAlignment; + if (numPartitions <= 0) { + upperBoundForAlignment = Math.min(maxParallelism / 2, upperBound); + } else { + upperBoundForAlignment = Math.min(numPartitions, upperBound); + numKeyGroupsOrPartitions = numPartitions; + } + + // When the shuffle type of vertex inputs contains keyBy or vertex is a source, + // we try to adjust the parallelism such that it divides + // the adjustableMaxParallelism without a remainder => data is evenly spread across subtasks + for (int p = newParallelism; p <= upperBoundForAlignment; p++) { + if (numKeyGroupsOrPartitions % p == 0) { return p; } } + if (numPartitions > 0) { + + // When adjust the parallelism after rounding up cannot be evenly divided by source + // numPartitions, Try to find the smallest parallelism that can satisfy the current + // consumption rate. + int p = newParallelism; + for (; p > 0; p--) { + if (numPartitions / p > numPartitions / newParallelism) { + if (numPartitions % p != 0) { + p++; + } + break; + } + } + + p = Math.max(p, parallelismLowerLimit); + + var message = + String.format( + SCALE_LIMITED_MESSAGE_FORMAT, + vertex, + newParallelism, + p, + String.format( + "numPartitions : %s,upperBoundForAlignment(maxParallelism or parallelismUpperLimit): %s, " + + "parallelismLowerLimit: %s.", + numPartitions, upperBound, parallelismLowerLimit)); Review Comment: You are right. ########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java: ########## @@ -389,15 +407,61 @@ protected static int scale( return newParallelism; Review Comment: Not necessarily, Source Vertex may also have keyedstate, and it should be matched with MaxParallelism. ########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java: ########## @@ -389,15 +407,61 @@ protected static int scale( return newParallelism; } - // When the shuffle type of vertex inputs contains keyBy or vertex is a source, we try to - // adjust the parallelism such that it divides the maxParallelism without a remainder - // => data is evenly spread across subtasks - for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) { - if (maxParallelism % p == 0) { + int numKeyGroupsOrPartitions = maxParallelism; + int upperBoundForAlignment; + if (numPartitions <= 0) { + upperBoundForAlignment = Math.min(maxParallelism / 2, upperBound); + } else { + upperBoundForAlignment = Math.min(numPartitions, upperBound); Review Comment: upperBound is already the smaller value obtained by comparing parallelismUpperLimit and maxParallelism. ########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java: ########## @@ -389,15 +407,61 @@ protected static int scale( return newParallelism; } - // When the shuffle type of vertex inputs contains keyBy or vertex is a source, we try to - // adjust the parallelism such that it divides the maxParallelism without a remainder - // => data is evenly spread across subtasks - for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) { - if (maxParallelism % p == 0) { + int numKeyGroupsOrPartitions = maxParallelism; + int upperBoundForAlignment; + if (numPartitions <= 0) { + upperBoundForAlignment = Math.min(maxParallelism / 2, upperBound); Review Comment: yes, but I think not all tasks after hash are in large flink keyedState. In this scenario, miss alignment has almost no impact on the task. I think this is the trade-off between performance and resources that was taken into consideration by the previous logic. But this becomes less acceptable for consuming kafka or pulsar @mxm @1996fanrui Maybe we can discuss this logic further -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org