This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit e7854193816dc348086423b42d4dff12dca4a80e Author: 1996fanrui <[email protected]> AuthorDate: Sun Nov 20 15:57:58 2022 +0800 [FLINK-28695][network] Fix the bug of old netty client isn't closed when netty server closes channel and no input channel --- .../CreditBasedPartitionRequestClientHandler.java | 28 ++++---- .../netty/PartitionRequestClientFactoryTest.java | 74 ++++++++++++++++++++++ 2 files changed, 86 insertions(+), 16 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java index 5e8ab08c423..d03abecd7f2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java @@ -124,22 +124,18 @@ class CreditBasedPartitionRequestClientHandler extends ChannelInboundHandlerAdap @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { - // Unexpected close. In normal operation, the client closes the connection after all input - // channels have been removed. This indicates a problem with the remote task manager. - if (!inputChannels.isEmpty()) { - final SocketAddress remoteAddr = ctx.channel().remoteAddress(); - - notifyAllChannelsOfErrorAndClose( - new RemoteTransportException( - "Connection unexpectedly closed by remote task manager '" - + remoteAddr - + " [ " - + connectionID.getResourceID().getStringWithMetadata() - + " ] " - + "'. " - + "This might indicate that the remote task manager was lost.", - remoteAddr)); - } + final SocketAddress remoteAddr = ctx.channel().remoteAddress(); + + notifyAllChannelsOfErrorAndClose( + new RemoteTransportException( + "Connection unexpectedly closed by remote task manager '" + + remoteAddr + + " [ " + + connectionID.getResourceID().getStringWithMetadata() + + " ] " + + "'. " + + "This might indicate that the remote task manager was lost.", + remoteAddr)); super.channelInactive(ctx); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java index 56110d95cbc..d923d76a6c9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java @@ -27,9 +27,12 @@ import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTe import org.apache.flink.testutils.junit.extensions.parameterized.Parameters; import org.apache.flink.util.TestLogger; +import org.apache.flink.shaded.netty4.io.netty.channel.Channel; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelException; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; import org.junit.jupiter.api.TestTemplate; import org.junit.jupiter.api.extension.ExtendWith; @@ -48,6 +51,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.initServerAndClient; import static org.apache.flink.runtime.io.network.netty.NettyTestUtil.shutdown; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -162,6 +166,56 @@ public class PartitionRequestClientFactoryTest extends TestLogger { assertThat(set.size()).isLessThanOrEqualTo(maxNumberOfConnections); } + /** + * Verify that the netty client reuse when the netty server closes the channel and there is no + * input channel. + */ + @TestTemplate + void testConnectionReuseWhenRemoteCloseAndNoInputChannel() throws Exception { + CompletableFuture<Void> inactiveFuture = new CompletableFuture<>(); + CompletableFuture<Channel> serverChannelFuture = new CompletableFuture<>(); + NettyProtocol protocol = + new NettyProtocol(null, null) { + @Override + public ChannelHandler[] getServerChannelHandlers() { + return new ChannelHandler[] { + // Close on read + new ChannelInboundHandlerAdapter() { + @Override + public void channelRegistered(ChannelHandlerContext ctx) + throws Exception { + super.channelRegistered(ctx); + serverChannelFuture.complete(ctx.channel()); + } + } + }; + } + + @Override + public ChannelHandler[] getClientChannelHandlers() { + return new ChannelHandler[] { + new ChannelInactiveFutureHandler(inactiveFuture) + }; + } + }; + NettyTestUtil.NettyServerAndClient serverAndClient = initServerAndClient(protocol); + + PartitionRequestClientFactory factory = + new PartitionRequestClientFactory( + serverAndClient.client(), 2, 1, connectionReuseEnabled); + + ConnectionID connectionID = serverAndClient.getConnectionID(RESOURCE_ID, 0); + NettyPartitionRequestClient oldClient = factory.createPartitionRequestClient(connectionID); + + // close server channel + Channel channel = serverChannelFuture.get(); + channel.close(); + inactiveFuture.get(); + NettyPartitionRequestClient newClient = factory.createPartitionRequestClient(connectionID); + assertThat(newClient).as("Factory should create a new client.").isNotSameAs(oldClient); + shutdown(serverAndClient); + } + @TestTemplate void testNettyClientConnectRetry() throws Exception { NettyTestUtil.NettyServerAndClient serverAndClient = createNettyServerAndClient(); @@ -344,4 +398,24 @@ public class PartitionRequestClientFactoryTest extends TestLogger { } } } + + private static class ChannelInactiveFutureHandler + extends CreditBasedPartitionRequestClientHandler { + + private final CompletableFuture<Void> inactiveFuture; + + private ChannelInactiveFutureHandler(CompletableFuture<Void> inactiveFuture) { + this.inactiveFuture = inactiveFuture; + } + + @Override + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + super.channelInactive(ctx); + inactiveFuture.complete(null); + } + + public CompletableFuture<Void> getInactiveFuture() { + return inactiveFuture; + } + } }
