This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 774cafa Fix flake in DiscoveryServiceTest (#1081) (#2406)
774cafa is described below
commit 774cafa25d6340c778c5c281045446086aff96a0
Author: Ivan Kelly <[email protected]>
AuthorDate: Tue Aug 28 07:30:42 2018 +0200
Fix flake in DiscoveryServiceTest (#1081) (#2406)
This test was flaking because it was only waiting for 1 second for
connection and message exchange to complete, which is not enough time
when there's heavy load on the machine (simulated with stress-ng).
The fix is to increase the timeout to 10 seconds. I've also cleaned up
the test to use a CompletableFuture rather than a CountDownLatch so
tha the test thread can be notified of failures in the handlers.
---
.../discovery/service/DiscoveryServiceTest.java | 77 ++++++++++++----------
1 file changed, 43 insertions(+), 34 deletions(-)
diff --git
a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java
b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java
index 7d4a5ee..24bcd6a 100644
---
a/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java
+++
b/pulsar-discovery-service/src/test/java/org/apache/pulsar/discovery/service/DiscoveryServiceTest.java
@@ -21,7 +21,6 @@ package org.apache.pulsar.discovery.service;
import static
org.apache.pulsar.discovery.service.web.ZookeeperCacheLoader.LOADBALANCE_BROKERS_ROOT;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
-import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.lang.reflect.Field;
@@ -31,16 +30,17 @@ import java.net.URISyntaxException;
import java.security.PrivateKey;
import java.security.cert.X509Certificate;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.common.api.Commands;
+import org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.util.SecurityUtility;
+import org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream;
import org.apache.pulsar.discovery.service.web.ZookeeperCacheLoader;
import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
import org.apache.pulsar.zookeeper.ZooKeeperChildrenCache;
@@ -66,8 +66,13 @@ import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
public class DiscoveryServiceTest extends BaseDiscoveryTestSetup {
+ private static final Logger log =
LoggerFactory.getLogger(DiscoveryServiceTest.class);
+
private final static String TLS_CLIENT_CERT_FILE_PATH =
"./src/test/resources/certificate/client.crt";
private final static String TLS_CLIENT_KEY_FILE_PATH =
"./src/test/resources/certificate/client.key";
@@ -120,48 +125,40 @@ public class DiscoveryServiceTest extends
BaseDiscoveryTestSetup {
/**
* It verifies: client connects to Discovery-service and receives
discovery response successfully.
- *
+ *
* @throws Exception
*/
@Test
public void testClientServerConnection() throws Exception {
addBrokerToZk(2);
- // 1. client connects to DiscoveryService, 2. Client receive
service-lookup response
- final int messageTransfer = 2;
- final CountDownLatch latch = new CountDownLatch(messageTransfer);
- NioEventLoopGroup workerGroup =
connectToService(service.getServiceUrl(), latch, false);
- try {
- assertTrue(latch.await(1, TimeUnit.SECONDS));
- } catch (InterruptedException e) {
- fail("should have received lookup response message from server",
e);
- }
+
+ final CompletableFuture<BaseCommand> promise = new
CompletableFuture<>();
+ NioEventLoopGroup workerGroup =
connectToService(service.getServiceUrl(), promise, false);
+ assertEquals(promise.get(10, TimeUnit.SECONDS).getType(),
BaseCommand.Type.CONNECTED);
workerGroup.shutdownGracefully();
}
@Test
public void testClientServerConnectionTls() throws Exception {
addBrokerToZk(2);
- // 1. client connects to DiscoveryService, 2. Client receive
service-lookup response
- final int messageTransfer = 2;
- final CountDownLatch latch = new CountDownLatch(messageTransfer);
- NioEventLoopGroup workerGroup =
connectToService(service.getServiceUrlTls(), latch, true);
- try {
- assertTrue(latch.await(1, TimeUnit.SECONDS));
- } catch (InterruptedException e) {
- fail("should have received lookup response message from server",
e);
- }
+
+ final CompletableFuture<BaseCommand> promise = new
CompletableFuture<>();
+ NioEventLoopGroup workerGroup =
connectToService(service.getServiceUrlTls(), promise, true);
+ assertEquals(promise.get(10, TimeUnit.SECONDS).getType(),
BaseCommand.Type.CONNECTED);
workerGroup.shutdownGracefully();
}
/**
* creates ClientHandler channel to connect and communicate with server
- *
+ *
* @param serviceUrl
* @param latch
* @return
* @throws URISyntaxException
*/
- public static NioEventLoopGroup connectToService(String serviceUrl,
CountDownLatch latch, boolean tls)
+ public static NioEventLoopGroup connectToService(String serviceUrl,
+
CompletableFuture<BaseCommand> promise,
+ boolean tls)
throws URISyntaxException {
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
@@ -181,14 +178,14 @@ public class DiscoveryServiceTest extends
BaseDiscoveryTestSetup {
SslContext sslCtx = builder.build();
ch.pipeline().addLast("tls",
sslCtx.newHandler(ch.alloc()));
}
- ch.pipeline().addLast(new ClientHandler(latch));
+ ch.pipeline().addLast(new ClientHandler(promise));
}
});
URI uri = new URI(serviceUrl);
InetSocketAddress serviceAddress = new
InetSocketAddress(uri.getHost(), uri.getPort());
b.connect(serviceAddress).addListener((ChannelFuture future) -> {
if (!future.isSuccess()) {
- throw new IllegalStateException(future.cause());
+ promise.completeExceptionally(future.cause());
}
});
return workerGroup;
@@ -196,24 +193,37 @@ public class DiscoveryServiceTest extends
BaseDiscoveryTestSetup {
static class ClientHandler extends ChannelInboundHandlerAdapter {
- final CountDownLatch latch;
+ final CompletableFuture<BaseCommand> promise;
- public ClientHandler(CountDownLatch latch) {
- this.latch = latch;
+ public ClientHandler(CompletableFuture<BaseCommand> promise) {
+ this.promise = promise;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws
Exception {
- ByteBuf buffer = (ByteBuf) msg;
- buffer.release();
- latch.countDown();
+ try {
+ ByteBuf buffer = (ByteBuf) msg;
+ buffer.readUnsignedInt(); // discard frame length
+ int cmdSize = (int) buffer.readUnsignedInt();
+ buffer.writerIndex(buffer.readerIndex() + cmdSize);
+ ByteBufCodedInputStream cmdInputStream =
ByteBufCodedInputStream.get(buffer);
+ BaseCommand.Builder cmdBuilder = BaseCommand.newBuilder();
+ BaseCommand cmd = cmdBuilder.mergeFrom(cmdInputStream,
null).build();
+
+ cmdInputStream.recycle();
+ cmdBuilder.recycle();
+ buffer.release();
+
+ promise.complete(cmd);
+ } catch (Exception e) {
+ promise.completeExceptionally(e);
+ }
ctx.close();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable
cause) {
- // Close the connection when an exception is raised.
- cause.printStackTrace();
+ promise.completeExceptionally(cause);
ctx.close();
}
@@ -221,7 +231,6 @@ public class DiscoveryServiceTest extends
BaseDiscoveryTestSetup {
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
ctx.writeAndFlush(Commands.newConnect("", "", null));
- latch.countDown();
}
}