This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new b25dfaee807 [FLINK-33968][runtime] Advance the calculation of num of
subpartitions to the time of initializing execution job vertex
b25dfaee807 is described below
commit b25dfaee80727d6662a5fd445fe51cc139a8b9eb
Author: Lijie Wang <[email protected]>
AuthorDate: Wed Dec 27 23:13:05 2023 +0800
[FLINK-33968][runtime] Advance the calculation of num of subpartitions to
the time of initializing execution job vertex
This closes #24019.
---
.../IntermediateResultPartition.java | 44 ++++++++++++----------
1 file changed, 24 insertions(+), 20 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
index 00e99674371..e132f9079f1 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
@@ -43,8 +43,8 @@ public class IntermediateResultPartition {
private final EdgeManager edgeManager;
- /** Number of subpartitions. Initialized lazily and will not change once
set. */
- private int numberOfSubpartitions = NUM_SUBPARTITIONS_UNKNOWN;
+ /** Number of subpartitions for dynamic graph. */
+ private final int numberOfSubpartitionsForDynamicGraph;
/** Whether this partition has produced all data. */
private boolean dataAllProduced = false;
@@ -64,6 +64,17 @@ public class IntermediateResultPartition {
this.producer = producer;
this.partitionId = new
IntermediateResultPartitionID(totalResult.getId(), partitionNumber);
this.edgeManager = edgeManager;
+
+ if (!producer.getExecutionGraphAccessor().isDynamic()) {
+ this.numberOfSubpartitionsForDynamicGraph =
NUM_SUBPARTITIONS_UNKNOWN;
+ } else {
+ this.numberOfSubpartitionsForDynamicGraph =
+ computeNumberOfSubpartitionsForDynamicGraph();
+ checkState(
+ numberOfSubpartitionsForDynamicGraph > 0,
+ "Number of subpartitions is an unexpected value: "
+ + numberOfSubpartitionsForDynamicGraph);
+ }
}
public void markPartitionGroupReleasable(ConsumedPartitionGroup
partitionGroup) {
@@ -114,17 +125,6 @@ public class IntermediateResultPartition {
}
public int getNumberOfSubpartitions() {
- if (numberOfSubpartitions == NUM_SUBPARTITIONS_UNKNOWN) {
- numberOfSubpartitions = computeNumberOfSubpartitions();
- checkState(
- numberOfSubpartitions > 0,
- "Number of subpartitions is an unexpected value: " +
numberOfSubpartitions);
- }
-
- return numberOfSubpartitions;
- }
-
- private int computeNumberOfSubpartitions() {
if (!getProducer().getExecutionGraphAccessor().isDynamic()) {
List<ConsumerVertexGroup> consumerVertexGroups =
getConsumerVertexGroups();
checkState(!consumerVertexGroups.isEmpty());
@@ -134,13 +134,17 @@ public class IntermediateResultPartition {
// for non-dynamic graph.
return consumerVertexGroups.get(0).size();
} else {
- if (totalResult.isBroadcast()) {
- // for dynamic graph and broadcast result, we only produced
one subpartition,
- // and all the downstream vertices should consume this
subpartition.
- return 1;
- } else {
- return computeNumberOfMaxPossiblePartitionConsumers();
- }
+ return numberOfSubpartitionsForDynamicGraph;
+ }
+ }
+
+ private int computeNumberOfSubpartitionsForDynamicGraph() {
+ if (totalResult.isBroadcast()) {
+ // for dynamic graph and broadcast result, we only produced one
subpartition,
+ // and all the downstream vertices should consume this
subpartition.
+ return 1;
+ } else {
+ return computeNumberOfMaxPossiblePartitionConsumers();
}
}