dawidwys commented on a change in pull request #15633:
URL: https://github.com/apache/flink/pull/15633#discussion_r614689940



##########
File path: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailureHandler.java
##########
@@ -73,8 +76,10 @@ public NetworkFailureHandler(
 
     /** Closes the specified channel after all queued write requests are 
flushed. */
     static void closeOnFlush(Channel channel) {
-        if (channel.isConnected()) {
+        if (channel.isConnected() && 
!channelsBeingClosed.containsKey(channel)) {
+            channelsBeingClosed.put(channel, channel);
             
channel.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
+            channelsBeingClosed.remove(channel);

Review comment:
       Actually @lindong28 I gave it a second thought and I think there is a 
problem with the current approach.
   
   The call to `channel.write(ChannelBuffers.EMPTY_BUFFER)` is an async call. 
There is no guarantee it finishes when you call 
`channelsBeingClosed.remove(channel);`. Therefore you still may end up in a 
"recursive" call that you call `closeOnFlush` because of a previous 
`closeOnFlush` call.
   
   We know that the call to `write` has finished only in the 
`ChannelFutureListener`  added via `addListener`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to