This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9da4069821a3ca6d06add3eab06b0af38f43112a Author: Till Rohrmann <[email protected]> AuthorDate: Fri Aug 20 19:14:41 2021 +0200 [FLINK-9925][tests] Harden ClientTest by making handler shareable This commit makes the handler that is used for the testConcurrentQueries shareable so that Netty won't fail if another pipeline is created. --- .../flink/queryablestate/network/ClientTest.java | 129 +++++++++------------ 1 file changed, 56 insertions(+), 73 deletions(-) diff --git a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java index 80134fc..7516d76 100644 --- a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java +++ b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java @@ -82,13 +82,11 @@ import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; /** Tests for {@link Client}. */ @@ -136,20 +134,7 @@ public class ClientTest extends TestLogger { final AtomicReference<Channel> channel = new AtomicReference<>(); serverChannel = - createServerChannel( - new ChannelInboundHandlerAdapter() { - @Override - public void channelActive(ChannelHandlerContext ctx) - throws Exception { - channel.set(ctx.channel()); - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) - throws Exception { - received.add((ByteBuf) msg); - } - }); + createServerChannel(new ChannelDataCollectingHandler(channel, received)); InetSocketAddress serverAddress = getKvStateServerAddress(serverChannel); @@ -326,30 +311,7 @@ public class ClientTest extends TestLogger { client = new Client<>("Test Client", 1, serializer, stats); serverChannel = - createServerChannel( - new ChannelInboundHandlerAdapter() { - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) - throws Exception { - ByteBuf buf = (ByteBuf) msg; - assertEquals( - MessageType.REQUEST, - MessageSerializer.deserializeHeader(buf)); - long requestId = MessageSerializer.getRequestId(buf); - KvStateInternalRequest request = - serializer.deserializeRequest(buf); - - buf.release(); - - KvStateResponse response = - new KvStateResponse(serializedResult); - ByteBuf serResponse = - MessageSerializer.serializeResponse( - ctx.alloc(), requestId, response); - - ctx.channel().writeAndFlush(serResponse); - } - }); + createServerChannel(new RespondingChannelHandler(serializer, serializedResult)); final InetSocketAddress serverAddress = getKvStateServerAddress(serverChannel); @@ -437,20 +399,7 @@ public class ClientTest extends TestLogger { final AtomicReference<Channel> channel = new AtomicReference<>(); serverChannel = - createServerChannel( - new ChannelInboundHandlerAdapter() { - @Override - public void channelActive(ChannelHandlerContext ctx) - throws Exception { - channel.set(ctx.channel()); - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) - throws Exception { - received.add((ByteBuf) msg); - } - }); + createServerChannel(new ChannelDataCollectingHandler(channel, received)); InetSocketAddress serverAddress = getKvStateServerAddress(serverChannel); @@ -550,24 +499,11 @@ public class ClientTest extends TestLogger { try { client = new Client<>("Test Client", 1, serializer, stats); - final AtomicBoolean received = new AtomicBoolean(); + final LinkedBlockingQueue<ByteBuf> received = new LinkedBlockingQueue<>(); final AtomicReference<Channel> channel = new AtomicReference<>(); serverChannel = - createServerChannel( - new ChannelInboundHandlerAdapter() { - @Override - public void channelActive(ChannelHandlerContext ctx) - throws Exception { - channel.set(ctx.channel()); - } - - @Override - public void channelRead(ChannelHandlerContext ctx, Object msg) - throws Exception { - received.set(true); - } - }); + createServerChannel(new ChannelDataCollectingHandler(channel, received)); InetSocketAddress serverAddress = getKvStateServerAddress(serverChannel); @@ -576,10 +512,7 @@ public class ClientTest extends TestLogger { new KvStateInternalRequest(new KvStateID(), new byte[0]); Future<KvStateResponse> future = client.sendRequest(serverAddress, request); - while (!received.get()) { - Thread.sleep(50L); - } - assertTrue("Receive timed out", received.get()); + received.take(); assertEquals(1, stats.getNumConnections()); @@ -881,4 +814,54 @@ public class ClientTest extends TestLogger { private InetSocketAddress getKvStateServerAddress(Channel serverChannel) { return (InetSocketAddress) serverChannel.localAddress(); } + + private static class ChannelDataCollectingHandler extends ChannelInboundHandlerAdapter { + private final AtomicReference<Channel> channel; + private final LinkedBlockingQueue<ByteBuf> received; + + private ChannelDataCollectingHandler( + AtomicReference<Channel> channel, LinkedBlockingQueue<ByteBuf> received) { + this.channel = channel; + this.received = received; + } + + @Override + public void channelActive(ChannelHandlerContext ctx) { + channel.set(ctx.channel()); + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + received.add((ByteBuf) msg); + } + } + + @ChannelHandler.Sharable + private static final class RespondingChannelHandler extends ChannelInboundHandlerAdapter { + private final MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer; + private final byte[] serializedResult; + + private RespondingChannelHandler( + MessageSerializer<KvStateInternalRequest, KvStateResponse> serializer, + byte[] serializedResult) { + this.serializer = serializer; + this.serializedResult = serializedResult; + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + ByteBuf buf = (ByteBuf) msg; + assertEquals(MessageType.REQUEST, MessageSerializer.deserializeHeader(buf)); + long requestId = MessageSerializer.getRequestId(buf); + KvStateInternalRequest request = serializer.deserializeRequest(buf); + + buf.release(); + + KvStateResponse response = new KvStateResponse(serializedResult); + ByteBuf serResponse = + MessageSerializer.serializeResponse(ctx.alloc(), requestId, response); + + ctx.channel().writeAndFlush(serResponse); + } + } }
