This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new b25dfaee807 [FLINK-33968][runtime] Advance the calculation of num of subpartitions to the time of initializing execution job vertex b25dfaee807 is described below commit b25dfaee80727d6662a5fd445fe51cc139a8b9eb Author: Lijie Wang <wangdachui9...@gmail.com> AuthorDate: Wed Dec 27 23:13:05 2023 +0800 [FLINK-33968][runtime] Advance the calculation of num of subpartitions to the time of initializing execution job vertex This closes #24019. --- .../IntermediateResultPartition.java | 44 ++++++++++++---------- 1 file changed, 24 insertions(+), 20 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java index 00e99674371..e132f9079f1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java @@ -43,8 +43,8 @@ public class IntermediateResultPartition { private final EdgeManager edgeManager; - /** Number of subpartitions. Initialized lazily and will not change once set. */ - private int numberOfSubpartitions = NUM_SUBPARTITIONS_UNKNOWN; + /** Number of subpartitions for dynamic graph. */ + private final int numberOfSubpartitionsForDynamicGraph; /** Whether this partition has produced all data. */ private boolean dataAllProduced = false; @@ -64,6 +64,17 @@ public class IntermediateResultPartition { this.producer = producer; this.partitionId = new IntermediateResultPartitionID(totalResult.getId(), partitionNumber); this.edgeManager = edgeManager; + + if (!producer.getExecutionGraphAccessor().isDynamic()) { + this.numberOfSubpartitionsForDynamicGraph = NUM_SUBPARTITIONS_UNKNOWN; + } else { + this.numberOfSubpartitionsForDynamicGraph = + computeNumberOfSubpartitionsForDynamicGraph(); + checkState( + numberOfSubpartitionsForDynamicGraph > 0, + "Number of subpartitions is an unexpected value: " + + numberOfSubpartitionsForDynamicGraph); + } } public void markPartitionGroupReleasable(ConsumedPartitionGroup partitionGroup) { @@ -114,17 +125,6 @@ public class IntermediateResultPartition { } public int getNumberOfSubpartitions() { - if (numberOfSubpartitions == NUM_SUBPARTITIONS_UNKNOWN) { - numberOfSubpartitions = computeNumberOfSubpartitions(); - checkState( - numberOfSubpartitions > 0, - "Number of subpartitions is an unexpected value: " + numberOfSubpartitions); - } - - return numberOfSubpartitions; - } - - private int computeNumberOfSubpartitions() { if (!getProducer().getExecutionGraphAccessor().isDynamic()) { List<ConsumerVertexGroup> consumerVertexGroups = getConsumerVertexGroups(); checkState(!consumerVertexGroups.isEmpty()); @@ -134,13 +134,17 @@ public class IntermediateResultPartition { // for non-dynamic graph. return consumerVertexGroups.get(0).size(); } else { - if (totalResult.isBroadcast()) { - // for dynamic graph and broadcast result, we only produced one subpartition, - // and all the downstream vertices should consume this subpartition. - return 1; - } else { - return computeNumberOfMaxPossiblePartitionConsumers(); - } + return numberOfSubpartitionsForDynamicGraph; + } + } + + private int computeNumberOfSubpartitionsForDynamicGraph() { + if (totalResult.isBroadcast()) { + // for dynamic graph and broadcast result, we only produced one subpartition, + // and all the downstream vertices should consume this subpartition. + return 1; + } else { + return computeNumberOfMaxPossiblePartitionConsumers(); } }