azagrebin commented on a change in pull request #8566: [FLINK-12673][network]
Introduce NetworkEnvironment.getUnreleasedPartitions instead of using
getResultPartitionManager
URL: https://github.com/apache/flink/pull/8566#discussion_r288938938
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionManager.java
##########
@@ -122,14 +123,9 @@ void onConsumedPartition(ResultPartition partition) {
}
}
- public boolean areAllPartitionsReleased() {
+ public Collection<ResultPartitionID> getUnreleasedPartitions() {
synchronized (registeredPartitions) {
- for (ResultPartition partition :
registeredPartitions.values()) {
- if (!partition.isReleased()) {
- return false;
- }
- }
- return true;
+ return registeredPartitions.keySet();
Review comment:
Ok, the behaviour did not actually change but this problem was already
before :) If we remove partition in
`ResultPartitionManager#onConsumedPartition` and concurrently continue to
release it, the old `areAllPartitionsReleased` will still skip its state check
because it is simply not in the `registeredPartitions` anymore.
I think we have to fix it here by moving release under the lock as it is
already done in `ResultPartitionManager.releasePartition`.
In general, I think `registeredPartitions` should reflect the release state
of them. At the end, memory occupied by partition in `registeredPartitions` is
also a resource to release so being in `registeredPartitions` and unreleased
should be always the case. If partition is released/unregistered, there is no
point to keep in `registeredPartitions` either.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services