dawidwys commented on a change in pull request #15633:
URL: https://github.com/apache/flink/pull/15633#discussion_r614590827



##########
File path: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailureHandler.java
##########
@@ -73,8 +76,10 @@ public NetworkFailureHandler(
 
     /** Closes the specified channel after all queued write requests are 
flushed. */
     static void closeOnFlush(Channel channel) {
-        if (channel.isConnected()) {
+        if (channel.isConnected() && 
!channelsBeingClosed.containsKey(channel)) {
+            channelsBeingClosed.put(channel, channel);
             
channel.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
+            channelsBeingClosed.remove(channel);

Review comment:
       nit: I see. Your explanation makes sense. Could we make it more explicit 
then that the only purpose of `channelsBeingClosed` is to prevent recursive 
call of `closeOnFlush`? E.g. could we move the variable right above the ` 
closeOnFlush`? 
   
   Moreover how about we rephrase the comment to:
   
   ```
   // The set of channels that are being closed in closeOnFlush. This is needed 
to prevent closing a channel recursively see FLINK-22085
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to