tillrohrmann commented on a change in pull request #16856:
URL: https://github.com/apache/flink/pull/16856#discussion_r692281022



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
##########
@@ -1279,10 +1279,14 @@ private void releasePartitions(final 
List<IntermediateResultPartitionID> releasa
 
             // Remove cached ShuffleDescriptor when partition is released
             releasablePartitions.stream()
-                    
.map(IntermediateResultPartitionID::getIntermediateDataSetID)
+                    .map(edgeManager::getConsumedPartitionGroupsById)
+                    .flatMap(List::stream)
                     .distinct()
-                    .map(intermediateResults::get)
-                    .forEach(IntermediateResult::notifyPartitionChanged);
+                    .forEach(
+                            group ->
+                                    intermediateResults
+                                            
.get(group.getFirst().getIntermediateDataSetID())
+                                            .notifyPartitionChanged(group));

Review comment:
       Obviously, part of the problem is me not being fully immersed into the 
details of this code. I do understand that we need extended bookkeeping logic 
to execute things more efficiently and the groups are a good solution for it. 
   
   I am mainly wondering whether it would be possible to structure the code a 
bit differently so that related concepts are located a bit closer together. 
Maybe my point becomes a bit clearer when looking at the control flow:
   
   1. An `Execution` terminates, this triggers `maybeReleasePartitions`
   2. `maybeReleasePartitions` checks via 
`partitionReleaseStrategy.vertexFinished` which partitions are releaseable
   3. Internally, partitionReleaseStrategy looks up which 
`ConsumerPartitionGroups` are fully consumed.
   4. Based on the `ConsumerPartitionGroups` we return which 
`IntermediateResultPartitionID` can be released and call `releasePartitions`
   5. Release partitions tries to rediscover the `ConsumerPartitionGroups` 
based on the `IntermediateResultPartitionID` whose `ShuffleDescriptors` can be 
released.
   6. We `stopTrackingAndReleasePartitions`
   
   In particular, step 5. seems overly complicated because of the reverse 
lookup between `IntermediateResultPartitionID` and `ConsumerPartitionGroup`. I 
think we already have this information in step 3./4.
   
   Part of the problem is probably that we added the `EdgeManager` and the 
group logic after everything else was added. Maybe it would help to change 
`PartitionReleaseStrategy` to return the set of completed 
`PartitionConsumerGroups`. Then one could clear that cached 
`ShuffleDescriptors` and then call `releasePartitions` with the partitions 
contained in the `PartitionConsumerGroups`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to