TanYuxin-tyx commented on code in PR #24151:
URL: https://github.com/apache/flink/pull/24151#discussion_r1467398883


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionView.java:
##########
@@ -71,18 +73,35 @@ public class UnionResultSubpartitionView
     private final Queue<Tuple2<ResultSubpartition.BufferAndBacklog, Integer>> 
cachedBuffers =
             new LinkedList<>();
 
+    /**
+     * A collection storing views that have triggered {@link
+     * #notifyDataAvailable(ResultSubpartitionView)} without {@link 
#notifyViewCreated(int,
+     * ResultSubpartitionView)}. This is used to resolve the race condition 
between these two
+     * methods.
+     */
+    private final Set<ResultSubpartitionView> unregisteredAvailableViews = new 
HashSet<>();
+
+    private final int totalNumViews;

Review Comment:
   `numTotalViews` may be better here.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/UnionResultSubpartitionView.java:
##########
@@ -71,18 +73,35 @@ public class UnionResultSubpartitionView
     private final Queue<Tuple2<ResultSubpartition.BufferAndBacklog, Integer>> 
cachedBuffers =
             new LinkedList<>();
 
+    /**

Review Comment:
   Though `cachedBuffers` is not introduced by this PR, l also left a comment 
here. They should be recycled whenever an exception occurs.
   And when `releaseAllResources`, I think all the `cachedBuffers` should also 
be recycled in case of leaking buffers.
   
   And when `releaseAllResources`, I am not sure it is necessary to clear these 
collections (e.g., `unregisteredAvailableViews`) as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to