This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9da4069821a3ca6d06add3eab06b0af38f43112a
Author: Till Rohrmann <[email protected]>
AuthorDate: Fri Aug 20 19:14:41 2021 +0200

    [FLINK-9925][tests] Harden ClientTest by making handler shareable
    
    This commit makes the handler that is used for the testConcurrentQueries 
shareable so
    that Netty won't fail if another pipeline is created.
---
 .../flink/queryablestate/network/ClientTest.java   | 129 +++++++++------------
 1 file changed, 56 insertions(+), 73 deletions(-)

diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
index 80134fc..7516d76 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/ClientTest.java
@@ -82,13 +82,11 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /** Tests for {@link Client}. */
@@ -136,20 +134,7 @@ public class ClientTest extends TestLogger {
             final AtomicReference<Channel> channel = new AtomicReference<>();
 
             serverChannel =
-                    createServerChannel(
-                            new ChannelInboundHandlerAdapter() {
-                                @Override
-                                public void 
channelActive(ChannelHandlerContext ctx)
-                                        throws Exception {
-                                    channel.set(ctx.channel());
-                                }
-
-                                @Override
-                                public void channelRead(ChannelHandlerContext 
ctx, Object msg)
-                                        throws Exception {
-                                    received.add((ByteBuf) msg);
-                                }
-                            });
+                    createServerChannel(new 
ChannelDataCollectingHandler(channel, received));
 
             InetSocketAddress serverAddress = 
getKvStateServerAddress(serverChannel);
 
@@ -326,30 +311,7 @@ public class ClientTest extends TestLogger {
             client = new Client<>("Test Client", 1, serializer, stats);
 
             serverChannel =
-                    createServerChannel(
-                            new ChannelInboundHandlerAdapter() {
-                                @Override
-                                public void channelRead(ChannelHandlerContext 
ctx, Object msg)
-                                        throws Exception {
-                                    ByteBuf buf = (ByteBuf) msg;
-                                    assertEquals(
-                                            MessageType.REQUEST,
-                                            
MessageSerializer.deserializeHeader(buf));
-                                    long requestId = 
MessageSerializer.getRequestId(buf);
-                                    KvStateInternalRequest request =
-                                            serializer.deserializeRequest(buf);
-
-                                    buf.release();
-
-                                    KvStateResponse response =
-                                            new 
KvStateResponse(serializedResult);
-                                    ByteBuf serResponse =
-                                            
MessageSerializer.serializeResponse(
-                                                    ctx.alloc(), requestId, 
response);
-
-                                    ctx.channel().writeAndFlush(serResponse);
-                                }
-                            });
+                    createServerChannel(new 
RespondingChannelHandler(serializer, serializedResult));
 
             final InetSocketAddress serverAddress = 
getKvStateServerAddress(serverChannel);
 
@@ -437,20 +399,7 @@ public class ClientTest extends TestLogger {
             final AtomicReference<Channel> channel = new AtomicReference<>();
 
             serverChannel =
-                    createServerChannel(
-                            new ChannelInboundHandlerAdapter() {
-                                @Override
-                                public void 
channelActive(ChannelHandlerContext ctx)
-                                        throws Exception {
-                                    channel.set(ctx.channel());
-                                }
-
-                                @Override
-                                public void channelRead(ChannelHandlerContext 
ctx, Object msg)
-                                        throws Exception {
-                                    received.add((ByteBuf) msg);
-                                }
-                            });
+                    createServerChannel(new 
ChannelDataCollectingHandler(channel, received));
 
             InetSocketAddress serverAddress = 
getKvStateServerAddress(serverChannel);
 
@@ -550,24 +499,11 @@ public class ClientTest extends TestLogger {
         try {
             client = new Client<>("Test Client", 1, serializer, stats);
 
-            final AtomicBoolean received = new AtomicBoolean();
+            final LinkedBlockingQueue<ByteBuf> received = new 
LinkedBlockingQueue<>();
             final AtomicReference<Channel> channel = new AtomicReference<>();
 
             serverChannel =
-                    createServerChannel(
-                            new ChannelInboundHandlerAdapter() {
-                                @Override
-                                public void 
channelActive(ChannelHandlerContext ctx)
-                                        throws Exception {
-                                    channel.set(ctx.channel());
-                                }
-
-                                @Override
-                                public void channelRead(ChannelHandlerContext 
ctx, Object msg)
-                                        throws Exception {
-                                    received.set(true);
-                                }
-                            });
+                    createServerChannel(new 
ChannelDataCollectingHandler(channel, received));
 
             InetSocketAddress serverAddress = 
getKvStateServerAddress(serverChannel);
 
@@ -576,10 +512,7 @@ public class ClientTest extends TestLogger {
                     new KvStateInternalRequest(new KvStateID(), new byte[0]);
             Future<KvStateResponse> future = client.sendRequest(serverAddress, 
request);
 
-            while (!received.get()) {
-                Thread.sleep(50L);
-            }
-            assertTrue("Receive timed out", received.get());
+            received.take();
 
             assertEquals(1, stats.getNumConnections());
 
@@ -881,4 +814,54 @@ public class ClientTest extends TestLogger {
     private InetSocketAddress getKvStateServerAddress(Channel serverChannel) {
         return (InetSocketAddress) serverChannel.localAddress();
     }
+
+    private static class ChannelDataCollectingHandler extends 
ChannelInboundHandlerAdapter {
+        private final AtomicReference<Channel> channel;
+        private final LinkedBlockingQueue<ByteBuf> received;
+
+        private ChannelDataCollectingHandler(
+                AtomicReference<Channel> channel, LinkedBlockingQueue<ByteBuf> 
received) {
+            this.channel = channel;
+            this.received = received;
+        }
+
+        @Override
+        public void channelActive(ChannelHandlerContext ctx) {
+            channel.set(ctx.channel());
+        }
+
+        @Override
+        public void channelRead(ChannelHandlerContext ctx, Object msg) {
+            received.add((ByteBuf) msg);
+        }
+    }
+
+    @ChannelHandler.Sharable
+    private static final class RespondingChannelHandler extends 
ChannelInboundHandlerAdapter {
+        private final MessageSerializer<KvStateInternalRequest, 
KvStateResponse> serializer;
+        private final byte[] serializedResult;
+
+        private RespondingChannelHandler(
+                MessageSerializer<KvStateInternalRequest, KvStateResponse> 
serializer,
+                byte[] serializedResult) {
+            this.serializer = serializer;
+            this.serializedResult = serializedResult;
+        }
+
+        @Override
+        public void channelRead(ChannelHandlerContext ctx, Object msg) {
+            ByteBuf buf = (ByteBuf) msg;
+            assertEquals(MessageType.REQUEST, 
MessageSerializer.deserializeHeader(buf));
+            long requestId = MessageSerializer.getRequestId(buf);
+            KvStateInternalRequest request = 
serializer.deserializeRequest(buf);
+
+            buf.release();
+
+            KvStateResponse response = new KvStateResponse(serializedResult);
+            ByteBuf serResponse =
+                    MessageSerializer.serializeResponse(ctx.alloc(), 
requestId, response);
+
+            ctx.channel().writeAndFlush(serResponse);
+        }
+    }
 }

Reply via email to