zhuzhurk commented on code in PR #21646:
URL: https://github.com/apache/flink/pull/21646#discussion_r1068999257
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDecider.java:
##########
@@ -158,17 +150,17 @@ private ParallelismAndInputInfos
decideParallelismAndEvenlyDistributeSubpartitio
int decideParallelism(JobVertexID jobVertexId, List<BlockingResultInfo>
consumedResults) {
checkArgument(!consumedResults.isEmpty());
- long broadcastBytes = getReasonableBroadcastBytes(jobVertexId,
consumedResults);
- long nonBroadcastBytes = getNonBroadcastBytes(consumedResults);
-
- int parallelism =
- (int) Math.ceil((double) nonBroadcastBytes /
(dataVolumePerTask - broadcastBytes));
+ // Considering that the sizes of broadcast results are usually very
small, we compute the
+ // parallelism only based on sizes of non-broadcast results
+ long totalBytes =
+ getNonBroadcastResultInfos(consumedResults).stream()
+ .mapToLong(BlockingResultInfo::getNumBytesProduced)
+ .sum();
+ int parallelism = (int) Math.ceil((double) totalBytes /
dataVolumePerTask);
LOG.debug(
- "The size of broadcast data is {}, the size of non-broadcast
data is {}, "
- + "the initially decided parallelism of job vertex {}
is {}.",
- new MemorySize(broadcastBytes),
- new MemorySize(nonBroadcastBytes),
+ "The total size of data is {}, the initially decided
parallelism of job vertex {} is {}.",
Review Comment:
data -> non-broadcast data
This is needed to avoid confusion.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]