zhuzhurk commented on a change in pull request #17952:
URL: https://github.com/apache/flink/pull/17952#discussion_r764533772
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismDecider.java
##########
@@ -81,52 +88,36 @@ private int calculateParallelism(List<BlockingResultInfo>
consumedResults) {
.reduce(0L, Long::sum))
.sum();
- if (broadcastBytes > dataVolumePerTask
- || (broadcastBytes == dataVolumePerTask && nonBroadcastBytes >
0)) {
- LOG.warn(
- "The minimum size of one task to process is larger than "
- + "the size of data volume which is configured by "
- + "'"
- +
JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_DATA_VOLUME_PER_TASK.key()
- + "'. "
- + "Parallelism will be set to {}.",
- maxParallelism);
-
- return maxParallelism;
- } else if (broadcastBytes == dataVolumePerTask) {
- return minParallelism;
- } else {
- int parallelism =
- (int)
- Math.ceil(
- (double) nonBroadcastBytes
- / (dataVolumePerTask -
broadcastBytes));
- parallelism = Math.max(parallelism, minParallelism);
- parallelism = Math.min(parallelism, maxParallelism);
- return parallelism;
- }
- }
-
- /** The factory to instantiate {@link DefaultVertexParallelismDecider}. */
- public static class Factory implements VertexParallelismDecider.Factory {
+ long expectedMaxBroadcastBytes =
+ (long) Math.ceil((dataVolumePerTask * CAP_RATIO_OF_BROADCAST));
- private final Configuration configuration;
+ if (broadcastBytes > expectedMaxBroadcastBytes) {
+ LOG.info(
+ "The number of broadcast bytes: {} is larger than the
expected maximum value: {} ('{}' * {})."
+ + " Use the expected maximum value as the number
of broadcast bytes to decide the parallelism.",
+ broadcastBytes,
+ expectedMaxBroadcastBytes,
+
JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_DATA_VOLUME_PER_TASK.key(),
+ CAP_RATIO_OF_BROADCAST);
- public Factory(Configuration configuration) {
- this.configuration = configuration;
+ broadcastBytes = expectedMaxBroadcastBytes;
}
- @Override
- public VertexParallelismDecider create() {
- return new DefaultVertexParallelismDecider(
- configuration.getInteger(
-
JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_MAX_PARALLELISM),
- configuration.getInteger(
-
JobManagerOptions.ADAPTIVE_BACH_SCHEDULER_MIN_PARALLELISM),
- configuration.get(
-
JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_DATA_VOLUME_PER_TASK),
- configuration.get(
-
JobManagerOptions.ADAPTIVE_BATCH_SCHEDULER_DEFAULT_SOURCE_PARALLELISM));
- }
+ int parallelism =
+ (int) Math.ceil((double) nonBroadcastBytes /
(dataVolumePerTask - broadcastBytes));
+ parallelism = Math.max(parallelism, minParallelism);
+ parallelism = Math.min(parallelism, maxParallelism);
+ return parallelism;
+ }
+
+ public static DefaultVertexParallelismDecider from(Configuration
configuration) {
+ return new DefaultVertexParallelismDecider(
Review comment:
I think we need to do some checks about the params, like minParallelism>
0, minParallelism < maxParallelism, dataVolumePerTask > 0,
defaultSourceParallelism > 0.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]