xiangqiao123 commented on a change in pull request #18102:
URL: https://github.com/apache/flink/pull/18102#discussion_r772171166



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/failover/flip1/partitionrelease/RegionPartitionGroupReleaseStrategy.java
##########
@@ -144,6 +145,52 @@ private PipelinedRegionExecutionView 
getPipelinedRegionExecutionViewForVertex(
         return releasablePartitionGroups;
     }
 
+    @VisibleForTesting
+    public boolean isRegionOfVertexFinished(final ExecutionVertexID 
executionVertexId) {
+        final PipelinedRegionExecutionView regionExecutionView =
+                getPipelinedRegionExecutionViewForVertex(executionVertexId);
+        return regionExecutionView.isFinished();
+    }
+
+    @Override
+    public void notifySchedulingTopologyUpdated(
+            SchedulingTopology schedulingTopology, List<ExecutionVertexID> 
newlyAddedVertices) {
+
+        for (SchedulingPipelinedRegion pipelinedRegion :
+                schedulingTopology.getAllPipelinedRegions()) {
+
+            final SchedulingExecutionVertex anyRegionVertex =
+                    pipelinedRegion.getVertices().iterator().next();
+
+            // TODO: check all the vertices are newly added.
+
+            // build PipelinedRegionExecutionView only for new regions to not 
affect existing views
+            // which may already record task finish status
+            if 
(!regionExecutionViewByVertex.containsKey(anyRegionVertex.getId())) {
+                final PipelinedRegionExecutionView regionExecutionView =
+                        new PipelinedRegionExecutionView(pipelinedRegion);
+                for (SchedulingExecutionVertex executionVertexId : 
pipelinedRegion.getVertices()) {
+                    regionExecutionViewByVertex.put(executionVertexId.getId(), 
regionExecutionView);
+                }
+
+                for (ConsumedPartitionGroup consumedPartitionGroup :
+                        
pipelinedRegion.getAllBlockingConsumedPartitionGroups()) {
+                    partitionGroupConsumerRegions
+                            .computeIfAbsent(
+                                    consumedPartitionGroup,
+                                    g -> {
+                                        ConsumerRegionGroupExecutionView 
regionGroup =
+                                                new 
ConsumerRegionGroupExecutionView();
+                                        
this.consumerRegionGroupExecutionViewMaintainer
+                                                
.notifyNewRegionGroup(regionGroup);

Review comment:
       At this moment,`regionGroup`  is a new and empty,call the  method 
`notifyNewRegionGroup`  does not work 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to