TanYuxin-tyx commented on code in PR #24151:
URL: https://github.com/apache/flink/pull/24151#discussion_r1467398883
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionView.java:
##########
@@ -71,18 +73,35 @@ public class UnionResultSubpartitionView
private final Queue<Tuple2<ResultSubpartition.BufferAndBacklog, Integer>>
cachedBuffers =
new LinkedList<>();
+ /**
+ * A collection storing views that have triggered {@link
+ * #notifyDataAvailable(ResultSubpartitionView)} without {@link
#notifyViewCreated(int,
+ * ResultSubpartitionView)}. This is used to resolve the race condition
between these two
+ * methods.
+ */
+ private final Set<ResultSubpartitionView> unregisteredAvailableViews = new
HashSet<>();
+
+ private final int totalNumViews;
Review Comment:
`numTotalViews` may be better here.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionView.java:
##########
@@ -71,18 +73,35 @@ public class UnionResultSubpartitionView
private final Queue<Tuple2<ResultSubpartition.BufferAndBacklog, Integer>>
cachedBuffers =
new LinkedList<>();
+ /**
Review Comment:
Though `cachedBuffers` is not introduced by this PR, l also left a comment
here. They should be recycled whenever an exception occurs.
And when `releaseAllResources`, I think all the `cachedBuffers` should also
be recycled in case of leaking buffers.
And when `releaseAllResources`, I am not sure it is necessary to clear these
collections (e.g., `unregisteredAvailableViews`) as well.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]