Github user anmolnar commented on a diff in the pull request:

    https://github.com/apache/zookeeper/pull/669#discussion_r233289109
  
    --- Diff: 
zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java 
---
    @@ -103,71 +105,102 @@
         boolean isConnected() {
             // Assuming that isConnected() is only used to initiate connection,
             // not used by some other connection status judgement.
    -        return channel != null;
    +        connectLock.lock();
    +        try {
    +            return channel != null || connectFuture != null;
    +        } finally {
    +            connectLock.unlock();
    +        }
    +    }
    +
    +    private Bootstrap configureBootstrapAllocator(Bootstrap bootstrap) {
    +        ByteBufAllocator testAllocator = TEST_ALLOCATOR.get();
    +        if (testAllocator != null) {
    +            return bootstrap.option(ChannelOption.ALLOCATOR, 
testAllocator);
    +        } else {
    +            return bootstrap;
    +        }
         }
     
         @Override
         void connect(InetSocketAddress addr) throws IOException {
             firstConnect = new CountDownLatch(1);
     
    -        ClientBootstrap bootstrap = new ClientBootstrap(channelFactory);
    -
    -        bootstrap.setPipelineFactory(new 
ZKClientPipelineFactory(addr.getHostString(), addr.getPort()));
    -        bootstrap.setOption("soLinger", -1);
    -        bootstrap.setOption("tcpNoDelay", true);
    -
    -        connectFuture = bootstrap.connect(addr);
    -        connectFuture.addListener(new ChannelFutureListener() {
    -            @Override
    -            public void operationComplete(ChannelFuture channelFuture) 
throws Exception {
    -                // this lock guarantees that channel won't be assgined 
after cleanup().
    -                connectLock.lock();
    -                try {
    -                    if (!channelFuture.isSuccess() || connectFuture == 
null) {
    -                        LOG.info("future isn't success, cause: {}", 
channelFuture.getCause());
    -                        return;
    -                    }
    -                    // setup channel, variables, connection, etc.
    -                    channel = channelFuture.getChannel();
    -
    -                    disconnected.set(false);
    -                    initialized = false;
    -                    lenBuffer.clear();
    -                    incomingBuffer = lenBuffer;
    -
    -                    sendThread.primeConnection();
    -                    updateNow();
    -                    updateLastSendAndHeard();
    -
    -                    if (sendThread.tunnelAuthInProgress()) {
    -                        waitSasl.drainPermits();
    -                        needSasl.set(true);
    -                        sendPrimePacket();
    -                    } else {
    -                        needSasl.set(false);
    -                    }
    +        Bootstrap bootstrap = new Bootstrap()
    +                .group(eventLoopGroup)
    +                .channel(NettyUtils.nioOrEpollSocketChannel())
    +                .option(ChannelOption.SO_LINGER, -1)
    +                .option(ChannelOption.TCP_NODELAY, true)
    +                .handler(new ZKClientPipelineFactory(addr.getHostString(), 
addr.getPort()));
    +        bootstrap = configureBootstrapAllocator(bootstrap);
    +        bootstrap.validate();
     
    -                    // we need to wake up on first connect to avoid 
timeout.
    -                    wakeupCnxn();
    -                    firstConnect.countDown();
    -                    LOG.info("channel is connected: {}", 
channelFuture.getChannel());
    -                } finally {
    -                    connectLock.unlock();
    +        connectLock.lock();
    +        try {
    +            connectFuture = bootstrap.connect(addr);
    +            connectFuture.addListener(new ChannelFutureListener() {
    +                @Override
    +                public void operationComplete(ChannelFuture channelFuture) 
throws Exception {
    +                    // this lock guarantees that channel won't be assigned 
after cleanup().
    +                    connectLock.lock();
    +                    try {
    +                        if (!channelFuture.isSuccess()) {
    +                            LOG.info("future isn't success, cause:", 
channelFuture.cause());
    +                            return;
    +                        } else if (connectFuture == null) {
    --- End diff --
    
    How could `connectFuture` be null?
    `connectFuture.addListener` call would have already thrown NPE in that case.


---

Reply via email to