Github user anmolnar commented on a diff in the pull request: https://github.com/apache/zookeeper/pull/669#discussion_r233289109 --- Diff: zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java --- @@ -103,71 +105,102 @@ boolean isConnected() { // Assuming that isConnected() is only used to initiate connection, // not used by some other connection status judgement. - return channel != null; + connectLock.lock(); + try { + return channel != null || connectFuture != null; + } finally { + connectLock.unlock(); + } + } + + private Bootstrap configureBootstrapAllocator(Bootstrap bootstrap) { + ByteBufAllocator testAllocator = TEST_ALLOCATOR.get(); + if (testAllocator != null) { + return bootstrap.option(ChannelOption.ALLOCATOR, testAllocator); + } else { + return bootstrap; + } } @Override void connect(InetSocketAddress addr) throws IOException { firstConnect = new CountDownLatch(1); - ClientBootstrap bootstrap = new ClientBootstrap(channelFactory); - - bootstrap.setPipelineFactory(new ZKClientPipelineFactory(addr.getHostString(), addr.getPort())); - bootstrap.setOption("soLinger", -1); - bootstrap.setOption("tcpNoDelay", true); - - connectFuture = bootstrap.connect(addr); - connectFuture.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture channelFuture) throws Exception { - // this lock guarantees that channel won't be assgined after cleanup(). - connectLock.lock(); - try { - if (!channelFuture.isSuccess() || connectFuture == null) { - LOG.info("future isn't success, cause: {}", channelFuture.getCause()); - return; - } - // setup channel, variables, connection, etc. - channel = channelFuture.getChannel(); - - disconnected.set(false); - initialized = false; - lenBuffer.clear(); - incomingBuffer = lenBuffer; - - sendThread.primeConnection(); - updateNow(); - updateLastSendAndHeard(); - - if (sendThread.tunnelAuthInProgress()) { - waitSasl.drainPermits(); - needSasl.set(true); - sendPrimePacket(); - } else { - needSasl.set(false); - } + Bootstrap bootstrap = new Bootstrap() + .group(eventLoopGroup) + .channel(NettyUtils.nioOrEpollSocketChannel()) + .option(ChannelOption.SO_LINGER, -1) + .option(ChannelOption.TCP_NODELAY, true) + .handler(new ZKClientPipelineFactory(addr.getHostString(), addr.getPort())); + bootstrap = configureBootstrapAllocator(bootstrap); + bootstrap.validate(); - // we need to wake up on first connect to avoid timeout. - wakeupCnxn(); - firstConnect.countDown(); - LOG.info("channel is connected: {}", channelFuture.getChannel()); - } finally { - connectLock.unlock(); + connectLock.lock(); + try { + connectFuture = bootstrap.connect(addr); + connectFuture.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture channelFuture) throws Exception { + // this lock guarantees that channel won't be assigned after cleanup(). + connectLock.lock(); + try { + if (!channelFuture.isSuccess()) { + LOG.info("future isn't success, cause:", channelFuture.cause()); + return; + } else if (connectFuture == null) { --- End diff -- How could `connectFuture` be null? `connectFuture.addListener` call would have already thrown NPE in that case.
---