Thesharing commented on a change in pull request #16856:
URL: https://github.com/apache/flink/pull/16856#discussion_r692107103
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
##########
@@ -1279,10 +1279,14 @@ private void releasePartitions(final
List<IntermediateResultPartitionID> releasa
// Remove cached ShuffleDescriptor when partition is released
releasablePartitions.stream()
-
.map(IntermediateResultPartitionID::getIntermediateDataSetID)
+ .map(edgeManager::getConsumedPartitionGroupsById)
+ .flatMap(List::stream)
.distinct()
- .map(intermediateResults::get)
- .forEach(IntermediateResult::notifyPartitionChanged);
+ .forEach(
+ group ->
+ intermediateResults
+
.get(group.getFirst().getIntermediateDataSetID())
+ .notifyPartitionChanged(group));
Review comment:
> Why is it possible that we release all consumer groups based on a
released partition? If I understand the code correctly, then it should be
possible that a single result partition belongs to one or more
ConsumerPartitionGroups. If this is correct, then all consuming
ConsumerVertexGroups need to finish in order to be allowed to remove the cached
entries for every ConsumerPartitionGroups. Is this guaranteed at this point?
> I think I don't fully understand the contracts of the releasePartitions
method. From a quick glance, it does not look correct that we release the
cached ShuffleDescriptors of all ConsumerPartitionGroups based on a released
IntermediateResultPartitionID unless we can guarantee that releasablePartitions
only contains partitions that belong to ConsumerPartitionGroups that are fully
consumed.
I think it's guaranteed semantically.
A result partition can be released only when all its consumers are finished.
A ConsumedPartitionGroup contains all the isomorphic partitions that be
consumed by the same consumer vertices. And these consumer vertices forms a
ConsumerVertexGroup (illustrated in FLINK-21110).
Therefore, if a partition is releasable, all consuming ConsumerVertexGroups
are surely finished. That means the ConsumedPartitionGroups corresponding to
these ConsumerVertexGroups are releasable. Thus, We can remove the cache for
them.
Besides, once a partition is released, it doesn't exist any longer. Thus the
cache for the ConsumedPartitionGroups it belongs are not valid anymore. They
should be removed.
I'll add more comments in the code here.
The semantic of "If a partition is releasable, all consuming
ConsumerVertexGroups are surely finished, the corresponding
ConsumedPartitionGroups are releasable" is guaranteed by
`PartitionReleaseStrategy#vertexFinished`. Maybe we should change the interface
`List<IntermediateResultPartitionID> vertexFinished(ExecutionVertexID
finishedVertex)` into `List<ConsumedPartitionGroup>
vertexFinished(ExecutionVertexID finishedVertex)`? In this way, we can make
sure the whole ConsumedPartitionGroup is released together.
> I think part of the problem for me is the constant mapping between
partitions, consumer partition groups and consumer vertex groups at various
places in the code. I have the feeling that this logic is a bit too spread to
be easily understandable.
Yes, I agree that the concept of ConsumedPartitionGroup and
ConsumerVertexGroup is not easily understandable. But in order to decrease the
computation complexity of traversing the graph and optimize the performance of
scheduler, I think it's necessary to group the isomorphic vertices and
partitions. I'm wondering, would it be better if I write a detailed document to
explain the concept of ConsumedPartitionGroup and ConsumerVertexGroup?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]