This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 774cafa Fix flake in DiscoveryServiceTest (#1081) (#2406) 774cafa is described below commit 774cafa25d6340c778c5c281045446086aff96a0 Author: Ivan Kelly <iv...@apache.org> AuthorDate: Tue Aug 28 07:30:42 2018 +0200 Fix flake in DiscoveryServiceTest (#1081) (#2406) This test was flaking because it was only waiting for 1 second for connection and message exchange to complete, which is not enough time when there's heavy load on the machine (simulated with stress-ng). The fix is to increase the timeout to 10 seconds. I've also cleaned up the test to use a CompletableFuture rather than a CountDownLatch so tha the test thread can be notified of failures in the handlers. --- .../discovery/service/DiscoveryServiceTest.java | 77 ++++++++++++---------- 1 file changed, 43 insertions(+), 34 deletions(-) diff --git a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java index 7d4a5ee..24bcd6a 100644 --- a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java +++ b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java @@ -21,7 +21,6 @@ package org.apache.pulsar.discovery.service; import static org.apache.pulsar.discovery.service.web.ZookeeperCacheLoader.LOADBALANCE_BROKERS_ROOT; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotEquals; -import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; import java.lang.reflect.Field; @@ -31,16 +30,17 @@ import java.net.URISyntaxException; import java.security.PrivateKey; import java.security.cert.X509Certificate; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.util.ZkUtils; import org.apache.pulsar.common.api.Commands; +import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; import org.apache.pulsar.common.util.ObjectMapperFactory; import org.apache.pulsar.common.util.SecurityUtility; +import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream; import org.apache.pulsar.discovery.service.web.ZookeeperCacheLoader; import org.apache.pulsar.policies.data.loadbalancer.LoadReport; import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache; @@ -66,8 +66,13 @@ import io.netty.handler.ssl.SslContext; import io.netty.handler.ssl.SslContextBuilder; import io.netty.handler.ssl.util.InsecureTrustManagerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + public class DiscoveryServiceTest extends BaseDiscoveryTestSetup { + private static final Logger log = LoggerFactory.getLogger(DiscoveryServiceTest.class); + private final static String TLS_CLIENT_CERT_FILE_PATH = "./src/test/resources/certificate/client.crt"; private final static String TLS_CLIENT_KEY_FILE_PATH = "./src/test/resources/certificate/client.key"; @@ -120,48 +125,40 @@ public class DiscoveryServiceTest extends BaseDiscoveryTestSetup { /** * It verifies: client connects to Discovery-service and receives discovery response successfully. - * + * * @throws Exception */ @Test public void testClientServerConnection() throws Exception { addBrokerToZk(2); - // 1. client connects to DiscoveryService, 2. Client receive service-lookup response - final int messageTransfer = 2; - final CountDownLatch latch = new CountDownLatch(messageTransfer); - NioEventLoopGroup workerGroup = connectToService(service.getServiceUrl(), latch, false); - try { - assertTrue(latch.await(1, TimeUnit.SECONDS)); - } catch (InterruptedException e) { - fail("should have received lookup response message from server", e); - } + + final CompletableFuture<BaseCommand> promise = new CompletableFuture<>(); + NioEventLoopGroup workerGroup = connectToService(service.getServiceUrl(), promise, false); + assertEquals(promise.get(10, TimeUnit.SECONDS).getType(), BaseCommand.Type.CONNECTED); workerGroup.shutdownGracefully(); } @Test public void testClientServerConnectionTls() throws Exception { addBrokerToZk(2); - // 1. client connects to DiscoveryService, 2. Client receive service-lookup response - final int messageTransfer = 2; - final CountDownLatch latch = new CountDownLatch(messageTransfer); - NioEventLoopGroup workerGroup = connectToService(service.getServiceUrlTls(), latch, true); - try { - assertTrue(latch.await(1, TimeUnit.SECONDS)); - } catch (InterruptedException e) { - fail("should have received lookup response message from server", e); - } + + final CompletableFuture<BaseCommand> promise = new CompletableFuture<>(); + NioEventLoopGroup workerGroup = connectToService(service.getServiceUrlTls(), promise, true); + assertEquals(promise.get(10, TimeUnit.SECONDS).getType(), BaseCommand.Type.CONNECTED); workerGroup.shutdownGracefully(); } /** * creates ClientHandler channel to connect and communicate with server - * + * * @param serviceUrl * @param latch * @return * @throws URISyntaxException */ - public static NioEventLoopGroup connectToService(String serviceUrl, CountDownLatch latch, boolean tls) + public static NioEventLoopGroup connectToService(String serviceUrl, + CompletableFuture<BaseCommand> promise, + boolean tls) throws URISyntaxException { NioEventLoopGroup workerGroup = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); @@ -181,14 +178,14 @@ public class DiscoveryServiceTest extends BaseDiscoveryTestSetup { SslContext sslCtx = builder.build(); ch.pipeline().addLast("tls", sslCtx.newHandler(ch.alloc())); } - ch.pipeline().addLast(new ClientHandler(latch)); + ch.pipeline().addLast(new ClientHandler(promise)); } }); URI uri = new URI(serviceUrl); InetSocketAddress serviceAddress = new InetSocketAddress(uri.getHost(), uri.getPort()); b.connect(serviceAddress).addListener((ChannelFuture future) -> { if (!future.isSuccess()) { - throw new IllegalStateException(future.cause()); + promise.completeExceptionally(future.cause()); } }); return workerGroup; @@ -196,24 +193,37 @@ public class DiscoveryServiceTest extends BaseDiscoveryTestSetup { static class ClientHandler extends ChannelInboundHandlerAdapter { - final CountDownLatch latch; + final CompletableFuture<BaseCommand> promise; - public ClientHandler(CountDownLatch latch) { - this.latch = latch; + public ClientHandler(CompletableFuture<BaseCommand> promise) { + this.promise = promise; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { - ByteBuf buffer = (ByteBuf) msg; - buffer.release(); - latch.countDown(); + try { + ByteBuf buffer = (ByteBuf) msg; + buffer.readUnsignedInt(); // discard frame length + int cmdSize = (int) buffer.readUnsignedInt(); + buffer.writerIndex(buffer.readerIndex() + cmdSize); + ByteBufCodedInputStream cmdInputStream = ByteBufCodedInputStream.get(buffer); + BaseCommand.Builder cmdBuilder = BaseCommand.newBuilder(); + BaseCommand cmd = cmdBuilder.mergeFrom(cmdInputStream, null).build(); + + cmdInputStream.recycle(); + cmdBuilder.recycle(); + buffer.release(); + + promise.complete(cmd); + } catch (Exception e) { + promise.completeExceptionally(e); + } ctx.close(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { - // Close the connection when an exception is raised. - cause.printStackTrace(); + promise.completeExceptionally(cause); ctx.close(); } @@ -221,7 +231,6 @@ public class DiscoveryServiceTest extends BaseDiscoveryTestSetup { public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); ctx.writeAndFlush(Commands.newConnect("", "", null)); - latch.countDown(); } }