tillrohrmann commented on a change in pull request #16856:
URL: https://github.com/apache/flink/pull/16856#discussion_r692281022
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
##########
@@ -1279,10 +1279,14 @@ private void releasePartitions(final
List<IntermediateResultPartitionID> releasa
// Remove cached ShuffleDescriptor when partition is released
releasablePartitions.stream()
-
.map(IntermediateResultPartitionID::getIntermediateDataSetID)
+ .map(edgeManager::getConsumedPartitionGroupsById)
+ .flatMap(List::stream)
.distinct()
- .map(intermediateResults::get)
- .forEach(IntermediateResult::notifyPartitionChanged);
+ .forEach(
+ group ->
+ intermediateResults
+
.get(group.getFirst().getIntermediateDataSetID())
+ .notifyPartitionChanged(group));
Review comment:
Obviously, part of the problem is me not being fully immersed into the
details of this code. I do understand that we need extended bookkeeping logic
to execute things more efficiently and the groups are a good solution for it.
I am mainly wondering whether it would be possible to structure the code a
bit differently so that related concepts are located a bit closer together.
Maybe my point becomes a bit clearer when looking at the control flow:
1. An `Execution` terminates, this triggers `maybeReleasePartitions`
2. `maybeReleasePartitions` checks via
`partitionReleaseStrategy.vertexFinished` which partitions are releaseable
3. Internally, partitionReleaseStrategy looks up which
`ConsumerPartitionGroups` are fully consumed.
4. Based on the `ConsumerPartitionGroups` we return which
`IntermediateResultPartitionID` can be released and call `releasePartitions`
5. Release partitions tries to rediscover the `ConsumerPartitionGroups`
based on the `IntermediateResultPartitionID` whose `ShuffleDescriptors` can be
released.
6. We `stopTrackingAndReleasePartitions`
In particular, step 5. seems overly complicated because of the reverse
lookup between `IntermediateResultPartitionID` and `ConsumerPartitionGroup`. I
think we already have this information in step 3./4.
Part of the problem is probably that we added the `EdgeManager` and the
group logic after everything else was added. Maybe it would help to change
`PartitionReleaseStrategy` to return the set of completed
`PartitionConsumerGroups`. Then one could clear that cached
`ShuffleDescriptors` and then call `releasePartitions` with the partitions
contained in the `PartitionConsumerGroups`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]