dawidwys commented on a change in pull request #15633:
URL: https://github.com/apache/flink/pull/15633#discussion_r614590827
##########
File path:
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailureHandler.java
##########
@@ -73,8 +76,10 @@ public NetworkFailureHandler(
/** Closes the specified channel after all queued write requests are
flushed. */
static void closeOnFlush(Channel channel) {
- if (channel.isConnected()) {
+ if (channel.isConnected() &&
!channelsBeingClosed.containsKey(channel)) {
+ channelsBeingClosed.put(channel, channel);
channel.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
+ channelsBeingClosed.remove(channel);
Review comment:
nit: I see. Your explanation makes sense. Could we make it more explicit
then that the only purpose of `channelsBeingClosed` is to prevent recursive
call of `closeOnFlush`? E.g. could we move the variable right above the `
closeOnFlush`?
Moreover how about we rephrase the comment to:
```
// The set of channels that are being closed in closeOnFlush. This is needed
to prevent closing a channel recursively see FLINK-22085
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]