zhuzhurk commented on code in PR #20350:
URL: https://github.com/apache/flink/pull/20350#discussion_r938582717


##########
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java:
##########
@@ -249,6 +251,24 @@ public static void 
completeCancellingForAllVertices(ExecutionGraph eg) {
         }
     }
 
+    public static void finishJobVertex(ExecutionGraph executionGraph, 
JobVertexID jobVertexID) {
+

Review Comment:
   jobVertexID -> jobVertexId
   
   I prefer to remove this empty line.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java:
##########
@@ -718,31 +716,37 @@ public CompletableFuture<?> suspend() {
     }
 
     private void updatePartitionConsumers(final IntermediateResultPartition 
partition) {
-        final Optional<ConsumerVertexGroup> consumerVertexGroup =
-                partition.getConsumerVertexGroupOptional();
-        if (!consumerVertexGroup.isPresent()) {
+        final List<ConsumerVertexGroup> consumerVertexGroups = 
partition.getConsumerVertexGroups();
+        if (consumerVertexGroups.isEmpty()) {
             return;
         }
-        for (ExecutionVertexID consumerVertexId : consumerVertexGroup.get()) {
-            final ExecutionVertex consumerVertex =
-                    
vertex.getExecutionGraphAccessor().getExecutionVertexOrThrow(consumerVertexId);
-            final Execution consumer = 
consumerVertex.getCurrentExecutionAttempt();
-            final ExecutionState consumerState = consumer.getState();
-
-            // ----------------------------------------------------------------
-            // Consumer is recovering or running => send update message now
-            // Consumer is deploying => cache the partition info which would be
-            // sent after switching to running
-            // ----------------------------------------------------------------
-            if (consumerState == DEPLOYING
-                    || consumerState == RUNNING
-                    || consumerState == INITIALIZING) {
-                final PartitionInfo partitionInfo = 
createPartitionInfo(partition);
-
-                if (consumerState == DEPLOYING) {
-                    consumerVertex.cachePartitionInfo(partitionInfo);
-                } else {
-                    
consumer.sendUpdatePartitionInfoRpcCall(Collections.singleton(partitionInfo));
+        final Set<ExecutionVertexID> updatedVertices = new HashSet<>();
+        for (ConsumerVertexGroup consumerVertexGroup : consumerVertexGroups) {
+            for (ExecutionVertexID consumerVertexId : consumerVertexGroup) {
+                final ExecutionVertex consumerVertex =
+                        vertex.getExecutionGraphAccessor()
+                                .getExecutionVertexOrThrow(consumerVertexId);
+                final Execution consumer = 
consumerVertex.getCurrentExecutionAttempt();
+                final ExecutionState consumerState = consumer.getState();
+
+                // 
----------------------------------------------------------------
+                // Consumer is recovering or running => send update message now
+                // Consumer is deploying => cache the partition info which 
would be
+                // sent after switching to running
+                // 
----------------------------------------------------------------
+                if ((consumerState == DEPLOYING
+                                || consumerState == RUNNING
+                                || consumerState == INITIALIZING)
+                        && !updatedVertices.contains(consumerVertexId)) {

Review Comment:
   Maybe do this check at the beginning of the inner loop? To avoid executing 
some unnecessary logics.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java:
##########
@@ -58,6 +66,25 @@ public IntermediateResultPartition(
         this.edgeManager = edgeManager;
     }
 
+    public void releaseConsumedPartitionGroup(ConsumedPartitionGroup 
partitionGroup) {

Review Comment:
   -> markPartitionGroupReleasable()



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to