dawidwys commented on a change in pull request #15633: URL: https://github.com/apache/flink/pull/15633#discussion_r614590827
########## File path: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailureHandler.java ########## @@ -73,8 +76,10 @@ public NetworkFailureHandler( /** Closes the specified channel after all queued write requests are flushed. */ static void closeOnFlush(Channel channel) { - if (channel.isConnected()) { + if (channel.isConnected() && !channelsBeingClosed.containsKey(channel)) { + channelsBeingClosed.put(channel, channel); channel.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); + channelsBeingClosed.remove(channel); Review comment: nit: I see. Your explanation makes sense. Could we make it more explicit then that the only purpose of `channelsBeingClosed` is to prevent recursive call of `closeOnFlush`? E.g. could we move the variable right above the ` closeOnFlush`? Moreover how about we rephrase the comment to: ``` // The set of channels that are being closed in closeOnFlush. This is needed to prevent closing a channel recursively see FLINK-22085 ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org