This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new b25dfaee807 [FLINK-33968][runtime] Advance the calculation of num of 
subpartitions to the time of initializing execution job vertex
b25dfaee807 is described below

commit b25dfaee80727d6662a5fd445fe51cc139a8b9eb
Author: Lijie Wang <wangdachui9...@gmail.com>
AuthorDate: Wed Dec 27 23:13:05 2023 +0800

    [FLINK-33968][runtime] Advance the calculation of num of subpartitions to 
the time of initializing execution job vertex
    
    This closes #24019.
---
 .../IntermediateResultPartition.java               | 44 ++++++++++++----------
 1 file changed, 24 insertions(+), 20 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
index 00e99674371..e132f9079f1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java
@@ -43,8 +43,8 @@ public class IntermediateResultPartition {
 
     private final EdgeManager edgeManager;
 
-    /** Number of subpartitions. Initialized lazily and will not change once 
set. */
-    private int numberOfSubpartitions = NUM_SUBPARTITIONS_UNKNOWN;
+    /** Number of subpartitions for dynamic graph. */
+    private final int numberOfSubpartitionsForDynamicGraph;
 
     /** Whether this partition has produced all data. */
     private boolean dataAllProduced = false;
@@ -64,6 +64,17 @@ public class IntermediateResultPartition {
         this.producer = producer;
         this.partitionId = new 
IntermediateResultPartitionID(totalResult.getId(), partitionNumber);
         this.edgeManager = edgeManager;
+
+        if (!producer.getExecutionGraphAccessor().isDynamic()) {
+            this.numberOfSubpartitionsForDynamicGraph = 
NUM_SUBPARTITIONS_UNKNOWN;
+        } else {
+            this.numberOfSubpartitionsForDynamicGraph =
+                    computeNumberOfSubpartitionsForDynamicGraph();
+            checkState(
+                    numberOfSubpartitionsForDynamicGraph > 0,
+                    "Number of subpartitions is an unexpected value: "
+                            + numberOfSubpartitionsForDynamicGraph);
+        }
     }
 
     public void markPartitionGroupReleasable(ConsumedPartitionGroup 
partitionGroup) {
@@ -114,17 +125,6 @@ public class IntermediateResultPartition {
     }
 
     public int getNumberOfSubpartitions() {
-        if (numberOfSubpartitions == NUM_SUBPARTITIONS_UNKNOWN) {
-            numberOfSubpartitions = computeNumberOfSubpartitions();
-            checkState(
-                    numberOfSubpartitions > 0,
-                    "Number of subpartitions is an unexpected value: " + 
numberOfSubpartitions);
-        }
-
-        return numberOfSubpartitions;
-    }
-
-    private int computeNumberOfSubpartitions() {
         if (!getProducer().getExecutionGraphAccessor().isDynamic()) {
             List<ConsumerVertexGroup> consumerVertexGroups = 
getConsumerVertexGroups();
             checkState(!consumerVertexGroups.isEmpty());
@@ -134,13 +134,17 @@ public class IntermediateResultPartition {
             // for non-dynamic graph.
             return consumerVertexGroups.get(0).size();
         } else {
-            if (totalResult.isBroadcast()) {
-                // for dynamic graph and broadcast result, we only produced 
one subpartition,
-                // and all the downstream vertices should consume this 
subpartition.
-                return 1;
-            } else {
-                return computeNumberOfMaxPossiblePartitionConsumers();
-            }
+            return numberOfSubpartitionsForDynamicGraph;
+        }
+    }
+
+    private int computeNumberOfSubpartitionsForDynamicGraph() {
+        if (totalResult.isBroadcast()) {
+            // for dynamic graph and broadcast result, we only produced one 
subpartition,
+            // and all the downstream vertices should consume this 
subpartition.
+            return 1;
+        } else {
+            return computeNumberOfMaxPossiblePartitionConsumers();
         }
     }
 

Reply via email to