huyuanfeng2018 commented on code in PR #879: URL: https://github.com/apache/flink-kubernetes-operator/pull/879#discussion_r1753039976
########## flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java: ########## @@ -378,28 +405,70 @@ protected static int scale( // Cap parallelism at either maxParallelism(number of key groups or source partitions) or // parallelism upper limit - final int upperBound = Math.min(maxParallelism, parallelismUpperLimit); + int upperBound = Math.min(maxParallelism, parallelismUpperLimit); // Apply min/max parallelism newParallelism = Math.min(Math.max(parallelismLowerLimit, newParallelism), upperBound); var adjustByMaxParallelism = inputShipStrategies.isEmpty() || inputShipStrategies.contains(HASH); if (!adjustByMaxParallelism) { - return newParallelism; + return Tuple2.of(newParallelism, Optional.empty()); } - // When the shuffle type of vertex inputs contains keyBy or vertex is a source, we try to - // adjust the parallelism such that it divides the maxParallelism without a remainder - // => data is evenly spread across subtasks - for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) { - if (maxParallelism % p == 0) { - return p; + if (numPartitions <= 0) { + // When the shuffle type of vertex inputs contains keyBy or vertex is a source, + // we try to adjust the parallelism such that it divides the maxParallelism without a + // remainder => data is evenly spread across subtasks + for (int p = newParallelism; p <= maxParallelism / 2 && p <= upperBound; p++) { + if (maxParallelism % p == 0) { + return Tuple2.of(p, Optional.empty()); + } + } + // If parallelism adjustment fails, use originally computed parallelism + return Tuple2.of(newParallelism, Optional.empty()); + } else { + + // When we know the numPartitions at a vertex, + // adjust the parallelism such that it divides the numPartitions without a remainder + // => Data is evenly distributed among subtasks + for (int p = newParallelism; p <= upperBound && p <= numPartitions; p++) { + if (numPartitions % p == 0) { + return Tuple2.of(p, Optional.empty()); + } } - } - // If parallelism adjustment fails, use originally computed parallelism - return newParallelism; + // When the degree of parallelism after rounding up cannot be evenly divided by source + // PartitionCount, Try to find the smallest parallelism that can satisfy the current + // consumption rate. + for (int p = newParallelism; p > parallelismLowerLimit; p--) { + if (numPartitions / p > numPartitions / newParallelism) { + if (numPartitions % p != 0) { + p += 1; + } Review Comment: Let me expand on my thoughts on this point: 1. The first thing we need to consider is the p that can be divisible by the number of partitions when `p<upperBound` is satisfied. 2. If step 1 fails to obtain the corresponding result, it means that we cannot guarantee the even consumption of the number of partitions when rounding up (due to uneven consumption, bottlenecks will exist in some tasks, so theoretically we get the largest upperBound The processing rate will not increase), at this time we consider taking a minimum value that is comparable to the current processing rate, satisfying newParallelism / partition = p/partition Here is an example: `numPartitions=35 ,newParallelism=20, upperBound = 30;` step1 : We start from 20 and go up. Since upperBound = 30, we cannot obtain the number of partitions that can be consumed evenly. p=30 and p=20 have no change in the consumption rate. step2: Since `35/20 = 1 .... 15`, 20 degrees of parallelism cannot consume Kafka evenly, so at this time there will be 15 tasks consuming two partitions,5 task consuming one partitions, so our final result only requires that there are tasks consuming two partition, then our processing rate won’t slow down That is `( numPartitions / p = 2 && numPartitions % p =0 || numPartitions/ (p-1) =2 && (p-1) % !=0 )`. So `p+=1` here is caused by the indivisible partition obtained when fetching down. It should need +1 to meet the conditions. `35 / 17 = 2` But eventually there will be a task consuming three partitions, so we need to add 17+1, 18 is our final result (17 tasks consume 2 partitions each, 1 task consumes one partition) step3: If p is already less than `parallelismLowerLimit` during the fetching process, we should directly use parallelismLowerLimit as the final degree of parallelism However, the above is all theoretical logic. I am not sure whether this will cause some negative effects. For example, the data distribution of the partition itself is uneven. Step 2 may aggravate this phenomenon, so I have reservations about this part of the logic, another approach is to directly return newParallelism -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org