Thesharing commented on a change in pull request #16856:
URL: https://github.com/apache/flink/pull/16856#discussion_r692107103



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
##########
@@ -1279,10 +1279,14 @@ private void releasePartitions(final 
List<IntermediateResultPartitionID> releasa
 
             // Remove cached ShuffleDescriptor when partition is released
             releasablePartitions.stream()
-                    
.map(IntermediateResultPartitionID::getIntermediateDataSetID)
+                    .map(edgeManager::getConsumedPartitionGroupsById)
+                    .flatMap(List::stream)
                     .distinct()
-                    .map(intermediateResults::get)
-                    .forEach(IntermediateResult::notifyPartitionChanged);
+                    .forEach(
+                            group ->
+                                    intermediateResults
+                                            
.get(group.getFirst().getIntermediateDataSetID())
+                                            .notifyPartitionChanged(group));

Review comment:
       > Why is it possible that we release all consumer groups based on a 
released partition? If I understand the code correctly, then it should be 
possible that a single result partition belongs to one or more 
ConsumerPartitionGroups. If this is correct, then all consuming 
ConsumerVertexGroups need to finish in order to be allowed to remove the cached 
entries for every ConsumerPartitionGroups. Is this guaranteed at this point?
   
   > I think I don't fully understand the contracts of the releasePartitions 
method. From a quick glance, it does not look correct that we release the 
cached ShuffleDescriptors of all ConsumerPartitionGroups based on a released 
IntermediateResultPartitionID unless we can guarantee that releasablePartitions 
only contains partitions that belong to ConsumerPartitionGroups that are fully 
consumed.
   
   I think it's guaranteed semantically. 
   
   A result partition can be released only when all its consumers are finished. 
A ConsumedPartitionGroup contains all the isomorphic partitions that be 
consumed by the same consumer vertices. And these consumer vertices forms a 
ConsumerVertexGroup (illustrated in FLINK-21110).
   
   Therefore, if a partition is releasable, all consuming ConsumerVertexGroups 
are surely finished. That means the ConsumedPartitionGroups corresponding to 
these ConsumerVertexGroups are releasable. Thus, We can remove the cache for 
them.
   
   Besides, once a partition is released, it doesn't exist any longer. Thus the 
cache for the ConsumedPartitionGroups it belongs are not valid anymore. They 
should be removed.
   
   I'll add more comments in the code here.
   
   The semantic of "If a partition is releasable, all consuming 
ConsumerVertexGroups are surely finished, the corresponding 
ConsumedPartitionGroups are releasable" is guaranteed by 
`PartitionReleaseStrategy#vertexFinished`. Maybe we should change the interface 
`List<IntermediateResultPartitionID> vertexFinished(ExecutionVertexID 
finishedVertex)` into `List<ConsumedPartitionGroup> 
vertexFinished(ExecutionVertexID finishedVertex)`? In this way, we can make 
sure the whole ConsumedPartitionGroup is released together.
   
   > I think part of the problem for me is the constant mapping between 
partitions, consumer partition groups and consumer vertex groups at various 
places in the code. I have the feeling that this logic is a bit too spread to 
be easily understandable.
   
   Yes, I agree that the concept of ConsumedPartitionGroup and 
ConsumerVertexGroup is not easily understandable. But in order to decrease the 
computation complexity of traversing the graph and optimize the performance of 
scheduler, I think it's necessary to group the isomorphic vertices and 
partitions. I'm wondering, would it be better if I write a detailed document to 
explain the concept of ConsumedPartitionGroup and ConsumerVertexGroup?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to