This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 774cafa  Fix flake in DiscoveryServiceTest (#1081) (#2406)
774cafa is described below

commit 774cafa25d6340c778c5c281045446086aff96a0
Author: Ivan Kelly <iv...@apache.org>
AuthorDate: Tue Aug 28 07:30:42 2018 +0200

    Fix flake in DiscoveryServiceTest (#1081) (#2406)
    
    This test was flaking because it was only waiting for 1 second for
    connection and message exchange to complete, which is not enough time
    when there's heavy load on the machine (simulated with stress-ng).
    
    The fix is to increase the timeout to 10 seconds. I've also cleaned up
    the test to use a CompletableFuture rather than a CountDownLatch so
    tha the test thread can be notified of failures in the handlers.
---
 .../discovery/service/DiscoveryServiceTest.java    | 77 ++++++++++++----------
 1 file changed, 43 insertions(+), 34 deletions(-)

diff --git 
a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java
 
b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java
index 7d4a5ee..24bcd6a 100644
--- 
a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java
+++ 
b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.discovery.service;
 import static 
org.apache.pulsar.discovery.service.web.ZookeeperCacheLoader.LOADBALANCE_BROKERS_ROOT;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
-import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
 
 import java.lang.reflect.Field;
@@ -31,16 +30,17 @@ import java.net.URISyntaxException;
 import java.security.PrivateKey;
 import java.security.cert.X509Certificate;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.pulsar.common.api.Commands;
+import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.common.util.SecurityUtility;
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
 import org.apache.pulsar.discovery.service.web.ZookeeperCacheLoader;
 import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
 import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
@@ -66,8 +66,13 @@ import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslContextBuilder;
 import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 public class DiscoveryServiceTest extends BaseDiscoveryTestSetup {
 
+    private static final Logger log = 
LoggerFactory.getLogger(DiscoveryServiceTest.class);
+
     private final static String TLS_CLIENT_CERT_FILE_PATH = 
"./src/test/resources/certificate/client.crt";
     private final static String TLS_CLIENT_KEY_FILE_PATH = 
"./src/test/resources/certificate/client.key";
 
@@ -120,48 +125,40 @@ public class DiscoveryServiceTest extends 
BaseDiscoveryTestSetup {
 
     /**
      * It verifies: client connects to Discovery-service and receives 
discovery response successfully.
-     * 
+     *
      * @throws Exception
      */
     @Test
     public void testClientServerConnection() throws Exception {
         addBrokerToZk(2);
-        // 1. client connects to DiscoveryService, 2. Client receive 
service-lookup response
-        final int messageTransfer = 2;
-        final CountDownLatch latch = new CountDownLatch(messageTransfer);
-        NioEventLoopGroup workerGroup = 
connectToService(service.getServiceUrl(), latch, false);
-        try {
-            assertTrue(latch.await(1, TimeUnit.SECONDS));
-        } catch (InterruptedException e) {
-            fail("should have received lookup response message from server", 
e);
-        }
+
+        final CompletableFuture<BaseCommand> promise = new 
CompletableFuture<>();
+        NioEventLoopGroup workerGroup = 
connectToService(service.getServiceUrl(), promise, false);
+        assertEquals(promise.get(10, TimeUnit.SECONDS).getType(), 
BaseCommand.Type.CONNECTED);
         workerGroup.shutdownGracefully();
     }
 
     @Test
     public void testClientServerConnectionTls() throws Exception {
         addBrokerToZk(2);
-        // 1. client connects to DiscoveryService, 2. Client receive 
service-lookup response
-        final int messageTransfer = 2;
-        final CountDownLatch latch = new CountDownLatch(messageTransfer);
-        NioEventLoopGroup workerGroup = 
connectToService(service.getServiceUrlTls(), latch, true);
-        try {
-            assertTrue(latch.await(1, TimeUnit.SECONDS));
-        } catch (InterruptedException e) {
-            fail("should have received lookup response message from server", 
e);
-        }
+
+        final CompletableFuture<BaseCommand> promise = new 
CompletableFuture<>();
+        NioEventLoopGroup workerGroup = 
connectToService(service.getServiceUrlTls(), promise, true);
+        assertEquals(promise.get(10, TimeUnit.SECONDS).getType(), 
BaseCommand.Type.CONNECTED);
         workerGroup.shutdownGracefully();
     }
 
     /**
      * creates ClientHandler channel to connect and communicate with server
-     * 
+     *
      * @param serviceUrl
      * @param latch
      * @return
      * @throws URISyntaxException
      */
-    public static NioEventLoopGroup connectToService(String serviceUrl, 
CountDownLatch latch, boolean tls)
+    public static NioEventLoopGroup connectToService(String serviceUrl,
+                                                     
CompletableFuture<BaseCommand> promise,
+                                                     boolean tls)
             throws URISyntaxException {
         NioEventLoopGroup workerGroup = new NioEventLoopGroup();
         Bootstrap b = new Bootstrap();
@@ -181,14 +178,14 @@ public class DiscoveryServiceTest extends 
BaseDiscoveryTestSetup {
                     SslContext sslCtx = builder.build();
                     ch.pipeline().addLast("tls", 
sslCtx.newHandler(ch.alloc()));
                 }
-                ch.pipeline().addLast(new ClientHandler(latch));
+                ch.pipeline().addLast(new ClientHandler(promise));
             }
         });
         URI uri = new URI(serviceUrl);
         InetSocketAddress serviceAddress = new 
InetSocketAddress(uri.getHost(), uri.getPort());
         b.connect(serviceAddress).addListener((ChannelFuture future) -> {
             if (!future.isSuccess()) {
-                throw new IllegalStateException(future.cause());
+                promise.completeExceptionally(future.cause());
             }
         });
         return workerGroup;
@@ -196,24 +193,37 @@ public class DiscoveryServiceTest extends 
BaseDiscoveryTestSetup {
 
     static class ClientHandler extends ChannelInboundHandlerAdapter {
 
-        final CountDownLatch latch;
+        final CompletableFuture<BaseCommand> promise;
 
-        public ClientHandler(CountDownLatch latch) {
-            this.latch = latch;
+        public ClientHandler(CompletableFuture<BaseCommand> promise) {
+            this.promise = promise;
         }
 
         @Override
         public void channelRead(ChannelHandlerContext ctx, Object msg) throws 
Exception {
-            ByteBuf buffer = (ByteBuf) msg;
-            buffer.release();
-            latch.countDown();
+            try {
+                ByteBuf buffer = (ByteBuf) msg;
+                buffer.readUnsignedInt(); // discard frame length
+                int cmdSize = (int) buffer.readUnsignedInt();
+                buffer.writerIndex(buffer.readerIndex() + cmdSize);
+                ByteBufCodedInputStream cmdInputStream = 
ByteBufCodedInputStream.get(buffer);
+                BaseCommand.Builder cmdBuilder = BaseCommand.newBuilder();
+                BaseCommand cmd = cmdBuilder.mergeFrom(cmdInputStream, 
null).build();
+
+                cmdInputStream.recycle();
+                cmdBuilder.recycle();
+                buffer.release();
+
+                promise.complete(cmd);
+            } catch (Exception e) {
+                promise.completeExceptionally(e);
+            }
             ctx.close();
         }
 
         @Override
         public void exceptionCaught(ChannelHandlerContext ctx, Throwable 
cause) {
-            // Close the connection when an exception is raised.
-            cause.printStackTrace();
+            promise.completeExceptionally(cause);
             ctx.close();
         }
 
@@ -221,7 +231,6 @@ public class DiscoveryServiceTest extends 
BaseDiscoveryTestSetup {
         public void channelActive(ChannelHandlerContext ctx) throws Exception {
             super.channelActive(ctx);
             ctx.writeAndFlush(Commands.newConnect("", "", null));
-            latch.countDown();
         }
 
     }

Reply via email to