ZOOKEEPER-3152: Port ZK netty stack to netty4 Summary: Ported the client connection netty stack from netty3 to netty4. This includes both the server side (NettyServerCnxn and friends) and the client side (ClientCnxnSocketNetty).
Test Plan: Modified `FourLetterWordsTest` and `NettyServerCnxnTest`, plus manual testing on a regional ensemble. FB Reviewers: nixon Author: Ilya Maykov <[email protected]> Reviewers: [email protected] Closes #669 from ivmaykov/ZOOKEEPER-3152 Project: http://git-wip-us.apache.org/repos/asf/zookeeper/repo Commit: http://git-wip-us.apache.org/repos/asf/zookeeper/commit/caca0627 Tree: http://git-wip-us.apache.org/repos/asf/zookeeper/tree/caca0627 Diff: http://git-wip-us.apache.org/repos/asf/zookeeper/diff/caca0627 Branch: refs/heads/master Commit: caca062767c36525e6ecead2ae0f34c447394809 Parents: 1507f67 Author: Ilya Maykov <[email protected]> Authored: Thu Nov 22 17:56:01 2018 +0100 Committer: Andor Molnar <[email protected]> Committed: Thu Nov 22 17:56:01 2018 +0100 ---------------------------------------------------------------------- build.xml | 2 +- ivy.xml | 4 +- .../org/apache/zookeeper/ClientCnxnSocket.java | 9 +- .../apache/zookeeper/ClientCnxnSocketNIO.java | 4 +- .../apache/zookeeper/ClientCnxnSocketNetty.java | 312 +++++++----- .../org/apache/zookeeper/common/NettyUtils.java | 76 +++ .../zookeeper/server/NettyServerCnxn.java | 364 +++++++++----- .../server/NettyServerCnxnFactory.java | 474 ++++++++++--------- .../server/quorum/UnifiedServerSocket.java | 6 +- .../apache/zookeeper/ClientCnxnSocketTest.java | 13 + .../zookeeper/server/NettyServerCnxnTest.java | 71 +++ .../org/apache/zookeeper/test/ClientTest.java | 1 + .../zookeeper/test/NettyNettySuiteBase.java | 13 + .../zookeeper/test/NioNettySuiteBase.java | 13 + .../org/apache/zookeeper/test/ReconfigTest.java | 79 ++-- .../zookeeper/test/TestByteBufAllocator.java | 152 ++++++ .../test/TestByteBufAllocatorTestHelper.java | 52 ++ 17 files changed, 1126 insertions(+), 519 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/build.xml ---------------------------------------------------------------------- diff --git a/build.xml b/build.xml index 3411025..5868532 100644 --- a/build.xml +++ b/build.xml @@ -36,7 +36,7 @@ xmlns:cs="antlib:com.puppycrawl.tools.checkstyle.ant"> <property name="audience-annotations.version" value="0.5.0" /> - <property name="netty.version" value="3.10.6.Final"/> + <property name="netty.version" value="4.1.29.Final"/> <property name="junit.version" value="4.12"/> <property name="mockito.version" value="1.8.5"/> http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/ivy.xml ---------------------------------------------------------------------- diff --git a/ivy.xml b/ivy.xml index 8692640..c7f79b6 100644 --- a/ivy.xml +++ b/ivy.xml @@ -59,8 +59,8 @@ <dependency org="org.apache.yetus" name="audience-annotations" rev="${audience-annotations.version}"/> - <dependency org="io.netty" name="netty" conf="default" rev="${netty.version}"> - <artifact name="netty" type="jar" conf="default"/> + <dependency org="io.netty" name="netty-all" conf="default" rev="${netty.version}"> + <artifact name="netty-all" type="jar" conf="default"/> </dependency> <dependency org="com.googlecode.json-simple" name="json-simple" rev="${json.version}" > http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java index 51ae8bf..ba3806c 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocket.java @@ -25,6 +25,7 @@ import java.nio.ByteBuffer; import java.text.MessageFormat; import java.util.List; import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.atomic.AtomicLong; import org.apache.jute.BinaryInputArchive; import org.apache.zookeeper.ClientCnxn.Packet; @@ -59,8 +60,8 @@ abstract class ClientCnxnSocket { * readLength() to receive the full message. */ protected ByteBuffer incomingBuffer = lenBuffer; - protected long sentCount = 0; - protected long recvCount = 0; + protected final AtomicLong sentCount = new AtomicLong(0L); + protected final AtomicLong recvCount = new AtomicLong(0L); protected long lastHeard; protected long lastSend; protected long now; @@ -95,11 +96,11 @@ abstract class ClientCnxnSocket { } long getSentCount() { - return sentCount; + return sentCount.get(); } long getRecvCount() { - return recvCount; + return recvCount.get(); } void updateLastHeard() { http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNIO.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNIO.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNIO.java index f17a819..4c97721 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNIO.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNIO.java @@ -82,7 +82,7 @@ public class ClientCnxnSocketNIO extends ClientCnxnSocket { if (!incomingBuffer.hasRemaining()) { incomingBuffer.flip(); if (incomingBuffer == lenBuffer) { - recvCount++; + recvCount.getAndIncrement(); readLength(); } else if (!initialized) { readConnectResult(); @@ -122,7 +122,7 @@ public class ClientCnxnSocketNIO extends ClientCnxnSocket { } sock.write(p.bb); if (!p.bb.hasRemaining()) { - sentCount++; + sentCount.getAndIncrement(); outgoingQueue.removeFirstOccurrence(p); if (p.requestHeader != null && p.requestHeader.getType() != OpCode.ping http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java index 34c3db3..74d1283 100755 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java @@ -18,46 +18,45 @@ package org.apache.zookeeper; -import org.apache.zookeeper.ClientCnxn.EndOfStreamException; -import org.apache.zookeeper.ClientCnxn.Packet; -import org.apache.zookeeper.client.ZKClientConfig; -import org.apache.zookeeper.common.ClientX509Util; -import org.jboss.netty.bootstrap.ClientBootstrap; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFactory; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelFutureListener; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.ExceptionEvent; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelUpstreamHandler; -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; -import org.jboss.netty.handler.ssl.SslHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLEngine; - import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; import java.util.Iterator; import java.util.List; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.ssl.SslHandler; +import org.apache.zookeeper.ClientCnxn.EndOfStreamException; +import org.apache.zookeeper.ClientCnxn.Packet; +import org.apache.zookeeper.client.ZKClientConfig; +import org.apache.zookeeper.common.ClientX509Util; +import org.apache.zookeeper.common.NettyUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import static org.apache.zookeeper.common.X509Exception.SSLContextException; /** @@ -68,18 +67,21 @@ import static org.apache.zookeeper.common.X509Exception.SSLContextException; public class ClientCnxnSocketNetty extends ClientCnxnSocket { private static final Logger LOG = LoggerFactory.getLogger(ClientCnxnSocketNetty.class); - ChannelFactory channelFactory = new NioClientSocketChannelFactory( - Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); - Channel channel; - CountDownLatch firstConnect; - ChannelFuture connectFuture; - Lock connectLock = new ReentrantLock(); - AtomicBoolean disconnected = new AtomicBoolean(); - AtomicBoolean needSasl = new AtomicBoolean(); - Semaphore waitSasl = new Semaphore(0); + private final EventLoopGroup eventLoopGroup; + private Channel channel; + private CountDownLatch firstConnect; + private ChannelFuture connectFuture; + private final Lock connectLock = new ReentrantLock(); + private final AtomicBoolean disconnected = new AtomicBoolean(); + private final AtomicBoolean needSasl = new AtomicBoolean(); + private final Semaphore waitSasl = new Semaphore(0); + + private static final AtomicReference<ByteBufAllocator> TEST_ALLOCATOR = + new AtomicReference<>(null); ClientCnxnSocketNetty(ZKClientConfig clientConfig) throws IOException { this.clientConfig = clientConfig; + eventLoopGroup = NettyUtils.newNioOrEpollEventLoopGroup(); initProperties(); } @@ -103,59 +105,90 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket { boolean isConnected() { // Assuming that isConnected() is only used to initiate connection, // not used by some other connection status judgement. - return channel != null; + connectLock.lock(); + try { + return channel != null || connectFuture != null; + } finally { + connectLock.unlock(); + } + } + + private Bootstrap configureBootstrapAllocator(Bootstrap bootstrap) { + ByteBufAllocator testAllocator = TEST_ALLOCATOR.get(); + if (testAllocator != null) { + return bootstrap.option(ChannelOption.ALLOCATOR, testAllocator); + } else { + return bootstrap; + } } @Override void connect(InetSocketAddress addr) throws IOException { firstConnect = new CountDownLatch(1); - ClientBootstrap bootstrap = new ClientBootstrap(channelFactory); - - bootstrap.setPipelineFactory(new ZKClientPipelineFactory(addr.getHostString(), addr.getPort())); - bootstrap.setOption("soLinger", -1); - bootstrap.setOption("tcpNoDelay", true); - - connectFuture = bootstrap.connect(addr); - connectFuture.addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture channelFuture) throws Exception { - // this lock guarantees that channel won't be assgined after cleanup(). - connectLock.lock(); - try { - if (!channelFuture.isSuccess() || connectFuture == null) { - LOG.info("future isn't success, cause: {}", channelFuture.getCause()); - return; - } - // setup channel, variables, connection, etc. - channel = channelFuture.getChannel(); - - disconnected.set(false); - initialized = false; - lenBuffer.clear(); - incomingBuffer = lenBuffer; - - sendThread.primeConnection(); - updateNow(); - updateLastSendAndHeard(); - - if (sendThread.tunnelAuthInProgress()) { - waitSasl.drainPermits(); - needSasl.set(true); - sendPrimePacket(); - } else { - needSasl.set(false); - } + Bootstrap bootstrap = new Bootstrap() + .group(eventLoopGroup) + .channel(NettyUtils.nioOrEpollSocketChannel()) + .option(ChannelOption.SO_LINGER, -1) + .option(ChannelOption.TCP_NODELAY, true) + .handler(new ZKClientPipelineFactory(addr.getHostString(), addr.getPort())); + bootstrap = configureBootstrapAllocator(bootstrap); + bootstrap.validate(); - // we need to wake up on first connect to avoid timeout. - wakeupCnxn(); - firstConnect.countDown(); - LOG.info("channel is connected: {}", channelFuture.getChannel()); - } finally { - connectLock.unlock(); + connectLock.lock(); + try { + connectFuture = bootstrap.connect(addr); + connectFuture.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture channelFuture) throws Exception { + // this lock guarantees that channel won't be assigned after cleanup(). + connectLock.lock(); + try { + if (!channelFuture.isSuccess()) { + LOG.info("future isn't success, cause:", channelFuture.cause()); + return; + } else if (connectFuture == null) { + LOG.info("connect attempt cancelled"); + // If the connect attempt was cancelled but succeeded + // anyway, make sure to close the channel, otherwise + // we may leak a file descriptor. + channelFuture.channel().close(); + return; + } + // setup channel, variables, connection, etc. + channel = channelFuture.channel(); + + disconnected.set(false); + initialized = false; + lenBuffer.clear(); + incomingBuffer = lenBuffer; + + sendThread.primeConnection(); + updateNow(); + updateLastSendAndHeard(); + + if (sendThread.tunnelAuthInProgress()) { + waitSasl.drainPermits(); + needSasl.set(true); + sendPrimePacket(); + } else { + needSasl.set(false); + } + LOG.info("channel is connected: {}", channelFuture.channel()); + } finally { + connectFuture = null; + connectLock.unlock(); + // need to wake on connect success or failure to avoid + // timing out ClientCnxn.SendThread which may be + // blocked waiting for first connect in doTransport(). + wakeupCnxn(); + firstConnect.countDown(); + } } - } - }); + }); + } finally { + connectLock.unlock(); + } } @Override @@ -163,11 +196,11 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket { connectLock.lock(); try { if (connectFuture != null) { - connectFuture.cancel(); + connectFuture.cancel(false); connectFuture = null; } if (channel != null) { - channel.close().awaitUninterruptibly(); + channel.close().syncUninterruptibly(); channel = null; } } finally { @@ -184,7 +217,9 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket { @Override void close() { - channelFactory.releaseExternalResources(); + if (!eventLoopGroup.isShuttingDown()) { + eventLoopGroup.shutdownGracefully(); + } } @Override @@ -199,6 +234,9 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket { @Override void packetAdded() { + // NO-OP. Adding a packet will already wake up a netty connection + // so we don't need to add a dummy packet to the queue to trigger + // a wake-up. } @Override @@ -230,13 +268,11 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket { return; } } else { - if ((head = outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS)) == null) { - return; - } + head = outgoingQueue.poll(waitTimeOut, TimeUnit.MILLISECONDS); } // check if being waken up on closing. if (!sendThread.getZkState().isAlive()) { - // adding back the patck to notify of failure in conLossPacket(). + // adding back the packet to notify of failure in conLossPacket(). addBack(head); return; } @@ -261,18 +297,46 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket { } } - private void sendPkt(Packet p) { + /** + * Sends a packet to the remote peer and flushes the channel. + * @param p packet to send. + * @return a ChannelFuture that will complete when the write operation + * succeeds or fails. + */ + private ChannelFuture sendPktAndFlush(Packet p) { + return sendPkt(p, true); + } + + /** + * Sends a packet to the remote peer but does not flush() the channel. + * @param p packet to send. + * @return a ChannelFuture that will complete when the write operation + * succeeds or fails. + */ + private ChannelFuture sendPktOnly(Packet p) { + return sendPkt(p, false); + } + + private ChannelFuture sendPkt(Packet p, boolean doFlush) { // Assuming the packet will be sent out successfully. Because if it fails, // the channel will close and clean up queues. p.createBB(); updateLastSend(); - sentCount++; - channel.write(ChannelBuffers.wrappedBuffer(p.bb)); + ChannelFuture result = channel.write(Unpooled.wrappedBuffer(p.bb)); + result.addListener(f -> { + if (f.isSuccess()) { + sentCount.getAndIncrement(); + } + }); + if (doFlush) { + channel.flush(); + } + return result; } private void sendPrimePacket() { // assuming the first packet is the priming packet. - sendPkt(outgoingQueue.remove()); + sendPktAndFlush(outgoingQueue.remove()); } /** @@ -290,13 +354,16 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket { pendingQueue.add(p); } } - sendPkt(p); + sendPktOnly(p); } if (outgoingQueue.isEmpty()) { break; } p = outgoingQueue.remove(); } + // TODO: maybe we should flush in the loop above every N packets/bytes? + // But, how do we determine the right value for N ... + channel.flush(); } @Override @@ -304,19 +371,19 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket { if (channel == null) { throw new IOException("channel has been closed"); } - sendPkt(p); + sendPktAndFlush(p); } @Override SocketAddress getRemoteSocketAddress() { Channel copiedChanRef = channel; - return (copiedChanRef == null) ? null : copiedChanRef.getRemoteAddress(); + return (copiedChanRef == null) ? null : copiedChanRef.remoteAddress(); } @Override SocketAddress getLocalSocketAddress() { Channel copiedChanRef = channel; - return (copiedChanRef == null) ? null : copiedChanRef.getLocalAddress(); + return (copiedChanRef == null) ? null : copiedChanRef.localAddress(); } @Override @@ -345,7 +412,7 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket { * ZKClientPipelineFactory is the netty pipeline factory for this netty * connection implementation. */ - private class ZKClientPipelineFactory implements ChannelPipelineFactory { + private class ZKClientPipelineFactory extends ChannelInitializer<SocketChannel> { private SSLContext sslContext = null; private SSLEngine sslEngine = null; private String host; @@ -357,13 +424,12 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket { } @Override - public ChannelPipeline getPipeline() throws Exception { - ChannelPipeline pipeline = Channels.pipeline(); + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); if (clientConfig.getBoolean(ZKClientConfig.SECURE_CLIENT)) { initSSL(pipeline); } pipeline.addLast("handler", new ZKClientHandler()); - return pipeline; } // The synchronized is to prevent the race on shared variable "sslEngine". @@ -375,7 +441,7 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket { sslEngine.setUseClientMode(true); } pipeline.addLast("ssl", new SslHandler(sslEngine)); - LOG.info("SSL handler added for channel: {}", pipeline.getChannel()); + LOG.info("SSL handler added for channel: {}", pipeline.channel()); } } @@ -383,13 +449,12 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket { * ZKClientHandler is the netty handler that sits in netty upstream last * place. It mainly handles read traffic and helps synchronize connection state. */ - private class ZKClientHandler extends SimpleChannelUpstreamHandler { + private class ZKClientHandler extends SimpleChannelInboundHandler<ByteBuf> { AtomicBoolean channelClosed = new AtomicBoolean(false); @Override - public void channelDisconnected(ChannelHandlerContext ctx, - ChannelStateEvent e) throws Exception { - LOG.info("channel is disconnected: {}", ctx.getChannel()); + public void channelInactive(ChannelHandlerContext ctx) throws Exception { + LOG.info("channel is disconnected: {}", ctx.channel()); cleanup(); } @@ -406,11 +471,9 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket { } @Override - public void messageReceived(ChannelHandlerContext ctx, - MessageEvent e) throws Exception { + protected void channelRead0(ChannelHandlerContext ctx, ByteBuf buf) throws Exception { updateNow(); - ChannelBuffer buf = (ChannelBuffer) e.getMessage(); - while (buf.readable()) { + while (buf.isReadable()) { if (incomingBuffer.remaining() > buf.readableBytes()) { int newLimit = incomingBuffer.position() + buf.readableBytes(); @@ -422,7 +485,7 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket { if (!incomingBuffer.hasRemaining()) { incomingBuffer.flip(); if (incomingBuffer == lenBuffer) { - recvCount++; + recvCount.getAndIncrement(); readLength(); } else if (!initialized) { readConnectResult(); @@ -439,13 +502,34 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket { } } wakeupCnxn(); + // Note: SimpleChannelInboundHandler releases the ByteBuf for us + // so we don't need to do it. } @Override - public void exceptionCaught(ChannelHandlerContext ctx, - ExceptionEvent e) throws Exception { - LOG.warn("Exception caught: {}", e, e.getCause()); + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + LOG.warn("Exception caught", cause); cleanup(); } } + + /** + * Sets the test ByteBufAllocator. This allocator will be used by all + * future instances of this class. + * It is not recommended to use this method outside of testing. + * @param allocator the ByteBufAllocator to use for all netty buffer + * allocations. + */ + static void setTestAllocator(ByteBufAllocator allocator) { + TEST_ALLOCATOR.set(allocator); + } + + /** + * Clears the test ByteBufAllocator. The default allocator will be used + * by all future instances of this class. + * It is not recommended to use this method outside of testing. + */ + static void clearTestAllocator() { + TEST_ALLOCATOR.set(null); + } } http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/zookeeper-server/src/main/java/org/apache/zookeeper/common/NettyUtils.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/common/NettyUtils.java b/zookeeper-server/src/main/java/org/apache/zookeeper/common/NettyUtils.java new file mode 100644 index 0000000..5883296 --- /dev/null +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/common/NettyUtils.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.common; + +import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.Epoll; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.epoll.EpollServerSocketChannel; +import io.netty.channel.epoll.EpollSocketChannel; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.ServerSocketChannel; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; + +/** + * Helper methods for netty code. + */ +public class NettyUtils { + /** + * If {@link Epoll#isAvailable()} <code>== true</code>, returns a new + * {@link EpollEventLoopGroup}, otherwise returns a new + * {@link NioEventLoopGroup}. + * @return a new {@link EventLoopGroup}. + */ + public static EventLoopGroup newNioOrEpollEventLoopGroup() { + if (Epoll.isAvailable()) { + return new EpollEventLoopGroup(); + } else { + return new NioEventLoopGroup(); + } + } + + /** + * If {@link Epoll#isAvailable()} <code>== true</code>, returns + * {@link EpollSocketChannel}, otherwise returns {@link NioSocketChannel}. + * @return a socket channel class. + */ + public static Class<? extends SocketChannel> nioOrEpollSocketChannel() { + if (Epoll.isAvailable()) { + return EpollSocketChannel.class; + } else { + return NioSocketChannel.class; + } + } + + /** + * If {@link Epoll#isAvailable()} <code>== true</code>, returns + * {@link EpollServerSocketChannel}, otherwise returns + * {@link NioServerSocketChannel}. + * @return a server socket channel class. + */ + public static Class<? extends ServerSocketChannel> nioOrEpollServerSocketChannel() { + if (Epoll.isAvailable()) { + return EpollServerSocketChannel.class; + } else { + return NioServerSocketChannel.class; + } + } +} http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java index f0a8f7f..311d3c1 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java @@ -18,23 +18,26 @@ package org.apache.zookeeper.server; -import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer; - import java.io.BufferedWriter; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.PrintWriter; import java.io.Writer; import java.net.InetSocketAddress; -import java.net.SocketAddress; import java.nio.ByteBuffer; +import java.nio.channels.SelectionKey; import java.security.cert.Certificate; import java.util.Arrays; -import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; +import io.netty.buffer.Unpooled; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.util.ReferenceCountUtil; import org.apache.jute.BinaryInputArchive; -import org.apache.jute.BinaryOutputArchive; import org.apache.jute.Record; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.proto.ReplyHeader; @@ -43,29 +46,23 @@ import org.apache.zookeeper.server.command.CommandExecutor; import org.apache.zookeeper.server.command.FourLetterCommands; import org.apache.zookeeper.server.command.NopCommand; import org.apache.zookeeper.server.command.SetTraceMaskCommand; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelFutureListener; -import org.jboss.netty.channel.MessageEvent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class NettyServerCnxn extends ServerCnxn { private static final Logger LOG = LoggerFactory.getLogger(NettyServerCnxn.class); - Channel channel; - ChannelBuffer queuedBuffer; - volatile boolean throttled; - ByteBuffer bb; - ByteBuffer bbLen = ByteBuffer.allocate(4); - long sessionId; - int sessionTimeout; - Certificate[] clientChain; - volatile boolean closingChannel; - - NettyServerCnxnFactory factory; - boolean initialized; + private final Channel channel; + private ByteBuf queuedBuffer; + private final AtomicBoolean throttled = new AtomicBoolean(false); + private ByteBuffer bb; + private final ByteBuffer bbLen = ByteBuffer.allocate(4); + private long sessionId; + private int sessionTimeout; + private Certificate[] clientChain; + private volatile boolean closingChannel; + + private final NettyServerCnxnFactory factory; + private boolean initialized; NettyServerCnxn(Channel channel, ZooKeeperServer zks, NettyServerCnxnFactory factory) { super(zks); @@ -82,8 +79,8 @@ public class NettyServerCnxn extends ServerCnxn { closingChannel = true; if (LOG.isDebugEnabled()) { - LOG.debug("close called for sessionid:0x" - + Long.toHexString(sessionId)); + LOG.debug("close called for sessionid:0x{}", + Long.toHexString(sessionId)); } setStale(); @@ -92,28 +89,23 @@ public class NettyServerCnxn extends ServerCnxn { // connection bean leak under certain race conditions. factory.unregisterConnection(this); - synchronized(factory.cnxns){ - // if this is not in cnxns then it's already closed - if (!factory.cnxns.remove(this)) { - if (LOG.isDebugEnabled()) { - LOG.debug("cnxns size:" + factory.cnxns.size()); - } - return; - } + // if this is not in cnxns then it's already closed + if (!factory.cnxns.remove(this)) { if (LOG.isDebugEnabled()) { - LOG.debug("close in progress for sessionid:0x" - + Long.toHexString(sessionId)); + LOG.debug("cnxns size:{}", factory.cnxns.size()); } + return; + } + if (LOG.isDebugEnabled()) { + LOG.debug("close in progress for sessionid:0x{}", + Long.toHexString(sessionId)); + } - factory.removeCnxnFromSessionMap(this); + factory.removeCnxnFromSessionMap(this); - synchronized (factory.ipMap) { - Set<NettyServerCnxn> s = - factory.ipMap.get(((InetSocketAddress)channel - .getRemoteAddress()).getAddress()); - s.remove(this); - } - } + factory.removeCnxnFromIpMap( + this, + ((InetSocketAddress)channel.remoteAddress()).getAddress()); if (zkServer != null) { zkServer.removeCnxn(this); @@ -123,7 +115,14 @@ public class NettyServerCnxn extends ServerCnxn { // Since we don't check on the futures created by write calls to the channel complete we need to make sure // that all writes have been completed before closing the channel or we risk data loss // See: http://lists.jboss.org/pipermail/netty-users/2009-August/001122.html - channel.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); + channel.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) { + future.channel().close().addListener(f -> releaseQueuedBuffer()); + } + }); + } else { + channel.eventLoop().execute(this::releaseQueuedBuffer); } } @@ -160,21 +159,6 @@ public class NettyServerCnxn extends ServerCnxn { } } - static class ResumeMessageEvent implements MessageEvent { - Channel channel; - ResumeMessageEvent(Channel channel) { - this.channel = channel; - } - @Override - public Object getMessage() {return null;} - @Override - public SocketAddress getRemoteAddress() {return null;} - @Override - public Channel getChannel() {return channel;} - @Override - public ChannelFuture getFuture() {return null;} - }; - @Override public void sendResponse(ReplyHeader h, Record r, String tag) throws IOException { @@ -192,28 +176,18 @@ public class NettyServerCnxn extends ServerCnxn { } @Override - public void enableRecv() { - if (throttled) { - throttled = false; - if (LOG.isDebugEnabled()) { - LOG.debug("Sending unthrottle event " + this); - } - channel.getPipeline().sendUpstream(new ResumeMessageEvent(channel)); - } - } - - @Override public void sendBuffer(ByteBuffer sendBuffer) { if (sendBuffer == ServerCnxnFactory.closeConn) { close(); return; } - channel.write(wrappedBuffer(sendBuffer)); - packetSent(); + channel.writeAndFlush(Unpooled.wrappedBuffer(sendBuffer)).addListener(f -> { + if (f.isSuccess()) { + packetSent(); + } + }); } - - /** * This class wraps the sendBuffer method of NIOServerCnxn. It is * responsible for chunking up the response to a client. Rather @@ -255,9 +229,7 @@ public class NettyServerCnxn extends ServerCnxn { } /** Return if four letter word found and responded to, otw false **/ - private boolean checkFourLetterWord(final Channel channel, - ChannelBuffer message, final int len) throws IOException - { + private boolean checkFourLetterWord(final Channel channel, ByteBuf message, final int len) { // We take advantage of the limited size of the length to look // for cmds. They are all 4-bytes which fits inside of an int if (!FourLetterCommands.isKnown(len)) { @@ -266,7 +238,10 @@ public class NettyServerCnxn extends ServerCnxn { String cmd = FourLetterCommands.getCommandString(len); - channel.setInterestOps(0).awaitUninterruptibly(); + // Stops automatic reads of incoming data on this channel. We don't + // expect any more traffic from the client when processing a 4LW + // so this shouldn't break anything. + channel.config().setAutoRead(false); packetReceived(4); final PrintWriter pwriter = new PrintWriter( @@ -281,8 +256,7 @@ public class NettyServerCnxn extends ServerCnxn { return true; } - LOG.info("Processing " + cmd + " command from " - + channel.getRemoteAddress()); + LOG.info("Processing {} command from {}", cmd, channel.remoteAddress()); if (len == FourLetterCommands.setTraceMaskCmd) { ByteBuffer mask = ByteBuffer.allocate(8); @@ -299,19 +273,126 @@ public class NettyServerCnxn extends ServerCnxn { } } - public void receiveMessage(ChannelBuffer message) { + /** + * Process incoming message. This should only be called from the event + * loop thread. + * @param buf the message bytes to process. + */ + void processMessage(ByteBuf buf) { + assert channel.eventLoop().inEventLoop(); + if (LOG.isDebugEnabled()) { + LOG.debug("0x{} queuedBuffer: {}", + Long.toHexString(sessionId), + queuedBuffer); + } + + if (LOG.isTraceEnabled()) { + LOG.trace("0x{} buf {}", + Long.toHexString(sessionId), + ByteBufUtil.hexDump(buf)); + } + + if (throttled.get()) { + LOG.debug("Received message while throttled"); + // we are throttled, so we need to queue + if (queuedBuffer == null) { + LOG.debug("allocating queue"); + queuedBuffer = channel.alloc().buffer(buf.readableBytes()); + } + queuedBuffer.writeBytes(buf); + if (LOG.isTraceEnabled()) { + LOG.trace("0x{} queuedBuffer {}", + Long.toHexString(sessionId), + ByteBufUtil.hexDump(queuedBuffer)); + } + } else { + LOG.debug("not throttled"); + if (queuedBuffer != null) { + queuedBuffer.writeBytes(buf); + processQueuedBuffer(); + } else { + receiveMessage(buf); + // Have to check !closingChannel, because an error in + // receiveMessage() could have led to close() being called. + if (!closingChannel && buf.isReadable()) { + if (LOG.isTraceEnabled()) { + LOG.trace("Before copy {}", buf); + } + if (queuedBuffer == null) { + queuedBuffer = channel.alloc().buffer(buf.readableBytes()); + } + queuedBuffer.writeBytes(buf); + if (LOG.isTraceEnabled()) { + LOG.trace("Copy is {}", queuedBuffer); + LOG.trace("0x{} queuedBuffer {}", + Long.toHexString(sessionId), + ByteBufUtil.hexDump(queuedBuffer)); + } + } + } + } + } + + /** + * Try to process previously queued message. This should only be called + * from the event loop thread. + */ + void processQueuedBuffer() { + assert channel.eventLoop().inEventLoop(); + if (queuedBuffer != null) { + if (LOG.isTraceEnabled()) { + LOG.trace("processing queue 0x{} queuedBuffer {}", + Long.toHexString(sessionId), + ByteBufUtil.hexDump(queuedBuffer)); + } + receiveMessage(queuedBuffer); + if (closingChannel) { + // close() could have been called if receiveMessage() failed + LOG.debug("Processed queue - channel closed, dropping remaining bytes"); + } else if (!queuedBuffer.isReadable()) { + LOG.debug("Processed queue - no bytes remaining"); + releaseQueuedBuffer(); + } else { + LOG.debug("Processed queue - bytes remaining"); + } + } else { + LOG.debug("queue empty"); + } + } + + /** + * Clean up queued buffer once it's no longer needed. This should only be + * called from the event loop thread. + */ + private void releaseQueuedBuffer() { + assert channel.eventLoop().inEventLoop(); + if (queuedBuffer != null) { + ReferenceCountUtil.release(queuedBuffer); + queuedBuffer = null; + } + } + + /** + * Receive a message, which can come from the queued buffer or from a new + * buffer coming in over the channel. This should only be called from the + * event loop thread. + * @param message the message bytes to process. + */ + private void receiveMessage(ByteBuf message) { + assert channel.eventLoop().inEventLoop(); try { - while(message.readable() && !throttled) { + while(message.isReadable() && !throttled.get()) { if (bb != null) { if (LOG.isTraceEnabled()) { - LOG.trace("message readable " + message.readableBytes() - + " bb len " + bb.remaining() + " " + bb); + LOG.trace("message readable {} bb len {} {}", + message.readableBytes(), + bb.remaining(), + bb); ByteBuffer dat = bb.duplicate(); dat.flip(); - LOG.trace(Long.toHexString(sessionId) - + " bb 0x" - + ChannelBuffers.hexDump( - ChannelBuffers.copiedBuffer(dat))); + LOG.trace("0x{} bb {}", + Long.toHexString(sessionId), + ByteBufUtil.hexDump(Unpooled.wrappedBuffer(dat))); } if (bb.remaining() > message.readableBytes()) { @@ -322,16 +403,15 @@ public class NettyServerCnxn extends ServerCnxn { bb.limit(bb.capacity()); if (LOG.isTraceEnabled()) { - LOG.trace("after readBytes message readable " - + message.readableBytes() - + " bb len " + bb.remaining() + " " + bb); + LOG.trace("after readBytes message readable {} bb len {} {}", + message.readableBytes(), + bb.remaining(), + bb); ByteBuffer dat = bb.duplicate(); dat.flip(); - LOG.trace("after readbytes " - + Long.toHexString(sessionId) - + " bb 0x" - + ChannelBuffers.hexDump( - ChannelBuffers.copiedBuffer(dat))); + LOG.trace("after readbytes 0x{} bb {}", + Long.toHexString(sessionId), + ByteBufUtil.hexDump(Unpooled.wrappedBuffer(dat))); } if (bb.remaining() == 0) { bb.flip(); @@ -342,10 +422,14 @@ public class NettyServerCnxn extends ServerCnxn { throw new IOException("ZK down"); } if (initialized) { + // TODO: if zks.processPacket() is changed to take a ByteBuffer[], + // we could implement zero-copy queueing. zks.processPacket(this, bb); } else { - LOG.debug("got conn req request from " - + getRemoteSocketAddress()); + if (LOG.isDebugEnabled()) { + LOG.debug("got conn req request from {}", + getRemoteSocketAddress()); + } zks.processConnectRequest(this, bb); initialized = true; } @@ -353,15 +437,14 @@ public class NettyServerCnxn extends ServerCnxn { } } else { if (LOG.isTraceEnabled()) { - LOG.trace("message readable " - + message.readableBytes() - + " bblenrem " + bbLen.remaining()); + LOG.trace("message readable {} bblenrem {}", + message.readableBytes(), + bbLen.remaining()); ByteBuffer dat = bbLen.duplicate(); dat.flip(); - LOG.trace(Long.toHexString(sessionId) - + " bbLen 0x" - + ChannelBuffers.hexDump( - ChannelBuffers.copiedBuffer(dat))); + LOG.trace("0x{} bbLen {}", + Long.toHexString(sessionId), + ByteBufUtil.hexDump(Unpooled.wrappedBuffer(dat))); } if (message.readableBytes() < bbLen.remaining()) { @@ -373,15 +456,15 @@ public class NettyServerCnxn extends ServerCnxn { bbLen.flip(); if (LOG.isTraceEnabled()) { - LOG.trace(Long.toHexString(sessionId) - + " bbLen 0x" - + ChannelBuffers.hexDump( - ChannelBuffers.copiedBuffer(bbLen))); + LOG.trace("0x{} bbLen {}", + Long.toHexString(sessionId), + ByteBufUtil.hexDump(Unpooled.wrappedBuffer(bbLen))); } int len = bbLen.getInt(); if (LOG.isTraceEnabled()) { - LOG.trace(Long.toHexString(sessionId) - + " bbLen len is " + len); + LOG.trace("0x{} bbLen len is {}", + Long.toHexString(sessionId), + len); } bbLen.clear(); @@ -403,16 +486,38 @@ public class NettyServerCnxn extends ServerCnxn { } } + /** + * An event that triggers a change in the channel's "Auto Read" setting. + * Used for throttling. By using an enum we can treat the two values as + * singletons and compare with ==. + */ + enum AutoReadEvent { + DISABLE, + ENABLE + } + + /** + * Note that the netty implementation ignores the <code>waitDisableRecv</code> + * parameter and is always asynchronous. + * @param waitDisableRecv ignored by this implementation. + */ @Override public void disableRecv(boolean waitDisableRecv) { - throttled = true; - if (LOG.isDebugEnabled()) { - LOG.debug("Throttling - disabling recv " + this); + if (throttled.compareAndSet(false, true)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Throttling - disabling recv {}", this); + } + channel.pipeline().fireUserEventTriggered(AutoReadEvent.DISABLE); } - ChannelFuture cf = channel.setReadable(false); + } - if (waitDisableRecv) { - cf.awaitUninterruptibly(); + @Override + public void enableRecv() { + if (throttled.compareAndSet(true, false)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Sending unthrottle event {}", this); + } + channel.pipeline().fireUserEventTriggered(AutoReadEvent.ENABLE); } } @@ -423,12 +528,26 @@ public class NettyServerCnxn extends ServerCnxn { @Override public int getInterestOps() { - return channel.getInterestOps(); + // This might not be 100% right, but it's only used for printing + // connection info in the netty implementation so it's probably ok. + if (channel == null || !channel.isOpen()) { + return 0; + } + int interestOps = 0; + if (!throttled.get()) { + interestOps |= SelectionKey.OP_READ; + } + if (!channel.isWritable()) { + // OP_READ means "can read", but OP_WRITE means "cannot write", + // it's weird. + interestOps |= SelectionKey.OP_WRITE; + } + return interestOps; } @Override public InetSocketAddress getRemoteSocketAddress() { - return (InetSocketAddress)channel.getRemoteAddress(); + return (InetSocketAddress)channel.remoteAddress(); } /** Send close connection packet to the client. @@ -469,4 +588,9 @@ public class NettyServerCnxn extends ServerCnxn { clientChain = Arrays.copyOf(chain, chain.length); } } + + // For tests and NettyServerCnxnFactory only, thus package-private. + Channel getChannel() { + return channel; + } } http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java index d3abf38..99de0e6 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java @@ -18,41 +18,6 @@ package org.apache.zookeeper.server; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.common.ClientX509Util; -import org.apache.zookeeper.common.X509Exception; -import org.apache.zookeeper.common.X509Exception.SSLContextException; -import org.apache.zookeeper.server.auth.ProviderRegistry; -import org.apache.zookeeper.server.auth.X509AuthenticationProvider; -import org.jboss.netty.bootstrap.ServerBootstrap; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.channel.Channel; -import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.channel.ChannelFutureListener; -import org.jboss.netty.channel.ChannelHandler.Sharable; -import org.jboss.netty.channel.ChannelHandlerContext; -import org.jboss.netty.channel.ChannelPipeline; -import org.jboss.netty.channel.ChannelPipelineFactory; -import org.jboss.netty.channel.ChannelStateEvent; -import org.jboss.netty.channel.Channels; -import org.jboss.netty.channel.ExceptionEvent; -import org.jboss.netty.channel.MessageEvent; -import org.jboss.netty.channel.SimpleChannelHandler; -import org.jboss.netty.channel.WriteCompletionEvent; -import org.jboss.netty.channel.group.ChannelGroup; -import org.jboss.netty.channel.group.DefaultChannelGroup; -import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; -import org.jboss.netty.handler.ssl.SslHandler; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLEngine; -import javax.net.ssl.SSLPeerUnverifiedException; -import javax.net.ssl.SSLSession; -import javax.net.ssl.X509KeyManager; -import javax.net.ssl.X509TrustManager; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; @@ -61,51 +26,86 @@ import java.security.NoSuchAlgorithmException; import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Objects; import java.util.Set; -import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLEngine; +import javax.net.ssl.SSLPeerUnverifiedException; +import javax.net.ssl.SSLSession; +import javax.net.ssl.X509KeyManager; +import javax.net.ssl.X509TrustManager; -import static org.jboss.netty.buffer.ChannelBuffers.dynamicBuffer; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.Channel; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.ChannelPromise; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.group.ChannelGroup; +import io.netty.channel.group.ChannelGroupFuture; +import io.netty.channel.group.DefaultChannelGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.handler.ssl.SslHandler; +import io.netty.util.AttributeKey; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.DefaultEventExecutor; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.common.ClientX509Util; +import org.apache.zookeeper.common.NettyUtils; +import org.apache.zookeeper.common.X509Exception; +import org.apache.zookeeper.common.X509Exception.SSLContextException; +import org.apache.zookeeper.server.auth.ProviderRegistry; +import org.apache.zookeeper.server.auth.X509AuthenticationProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class NettyServerCnxnFactory extends ServerCnxnFactory { private static final Logger LOG = LoggerFactory.getLogger(NettyServerCnxnFactory.class); - ServerBootstrap bootstrap; - Channel parentChannel; - ChannelGroup allChannels = new DefaultChannelGroup("zkServerCnxns"); - Map<InetAddress, Set<NettyServerCnxn>> ipMap = - new HashMap<InetAddress, Set<NettyServerCnxn>>( ); - InetSocketAddress localAddress; - int maxClientCnxns = 60; - ClientX509Util x509Util; + private final ServerBootstrap bootstrap; + private Channel parentChannel; + private final ChannelGroup allChannels = + new DefaultChannelGroup("zkServerCnxns", new DefaultEventExecutor()); + // Access to ipMap or to any Set contained in the map needs to be + // protected with synchronized (ipMap) { ... } + private final Map<InetAddress, Set<NettyServerCnxn>> ipMap = new HashMap<>(); + private InetSocketAddress localAddress; + private int maxClientCnxns = 60; + private final ClientX509Util x509Util; + + private static final AttributeKey<NettyServerCnxn> CONNECTION_ATTRIBUTE = + AttributeKey.valueOf("NettyServerCnxn"); + + private static final AtomicReference<ByteBufAllocator> TEST_ALLOCATOR = + new AtomicReference<>(null); /** - * This is an inner class since we need to extend SimpleChannelHandler, but + * This is an inner class since we need to extend ChannelDuplexHandler, but * NettyServerCnxnFactory already extends ServerCnxnFactory. By making it inner * this class gets access to the member variables and methods. */ @Sharable - class CnxnChannelHandler extends SimpleChannelHandler { - - @Override - public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) - throws Exception - { - if (LOG.isTraceEnabled()) { - LOG.trace("Channel closed " + e); - } - allChannels.remove(ctx.getChannel()); - } + class CnxnChannelHandler extends ChannelDuplexHandler { @Override - public void channelConnected(ChannelHandlerContext ctx, - ChannelStateEvent e) throws Exception - { + public void channelActive(ChannelHandlerContext ctx) throws Exception { if (LOG.isTraceEnabled()) { - LOG.trace("Channel connected " + e); + LOG.trace("Channel active {}", ctx.channel()); } - Channel channel = ctx.getChannel(); - InetAddress addr = ((InetSocketAddress) channel.getRemoteAddress()) + final Channel channel = ctx.channel(); + InetAddress addr = ((InetSocketAddress) channel.remoteAddress()) .getAddress(); if (maxClientCnxns > 0 && getClientCnxnCount(addr) >= maxClientCnxns) { LOG.warn("Too many connections from {} - max is {}", addr, @@ -116,170 +116,104 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory { NettyServerCnxn cnxn = new NettyServerCnxn(channel, zkServer, NettyServerCnxnFactory.this); - ctx.setAttachment(cnxn); + ctx.channel().attr(CONNECTION_ATTRIBUTE).set(cnxn); if (secure) { - SslHandler sslHandler = ctx.getPipeline().get(SslHandler.class); - ChannelFuture handshakeFuture = sslHandler.handshake(); + SslHandler sslHandler = ctx.pipeline().get(SslHandler.class); + Future<Channel> handshakeFuture = sslHandler.handshakeFuture(); handshakeFuture.addListener(new CertificateVerifier(sslHandler, cnxn)); } else { - allChannels.add(ctx.getChannel()); + allChannels.add(ctx.channel()); addCnxn(cnxn); } } @Override - public void channelDisconnected(ChannelHandlerContext ctx, - ChannelStateEvent e) throws Exception - { + public void channelInactive(ChannelHandlerContext ctx) throws Exception { if (LOG.isTraceEnabled()) { - LOG.trace("Channel disconnected " + e); + LOG.trace("Channel inactive {}", ctx.channel()); } - NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment(); + allChannels.remove(ctx.channel()); + NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null); if (cnxn != null) { if (LOG.isTraceEnabled()) { - LOG.trace("Channel disconnect caused close " + e); + LOG.trace("Channel inactive caused close {}", cnxn); } cnxn.close(); } } @Override - public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) - throws Exception - { - LOG.warn("Exception caught " + e, e.getCause()); - NettyServerCnxn cnxn = (NettyServerCnxn) ctx.getAttachment(); + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + LOG.warn("Exception caught", cause); + NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).getAndSet(null); if (cnxn != null) { if (LOG.isDebugEnabled()) { - LOG.debug("Closing " + cnxn); + LOG.debug("Closing {}", cnxn); } cnxn.close(); } } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) - throws Exception - { - if (LOG.isTraceEnabled()) { - LOG.trace("message received called " + e.getMessage()); - } + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { try { - if (LOG.isDebugEnabled()) { - LOG.debug("New message " + e.toString() - + " from " + ctx.getChannel()); - } - NettyServerCnxn cnxn = (NettyServerCnxn)ctx.getAttachment(); - synchronized(cnxn) { - processMessage(e, cnxn); + if (evt == NettyServerCnxn.AutoReadEvent.ENABLE) { + LOG.debug("Received AutoReadEvent.ENABLE"); + NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get(); + // TODO(ilyam): Not sure if cnxn can be null here. It becomes null if channelInactive() + // or exceptionCaught() trigger, but it's unclear to me if userEventTriggered() can run + // after either of those. Check for null just to be safe ... + if (cnxn != null) { + cnxn.processQueuedBuffer(); + } + ctx.channel().config().setAutoRead(true); + } else if (evt == NettyServerCnxn.AutoReadEvent.DISABLE) { + LOG.debug("Received AutoReadEvent.DISABLE"); + ctx.channel().config().setAutoRead(false); } - } catch(Exception ex) { - LOG.error("Unexpected exception in receive", ex); - throw ex; + } finally { + ReferenceCountUtil.release(evt); } } - private void processMessage(MessageEvent e, NettyServerCnxn cnxn) { - if (LOG.isDebugEnabled()) { - LOG.debug(Long.toHexString(cnxn.sessionId) + " queuedBuffer: " - + cnxn.queuedBuffer); - } - - if (e instanceof NettyServerCnxn.ResumeMessageEvent) { - LOG.debug("Received ResumeMessageEvent"); - if (cnxn.queuedBuffer != null) { - if (LOG.isTraceEnabled()) { - LOG.trace("processing queue " - + Long.toHexString(cnxn.sessionId) - + " queuedBuffer 0x" - + ChannelBuffers.hexDump(cnxn.queuedBuffer)); - } - cnxn.receiveMessage(cnxn.queuedBuffer); - if (!cnxn.queuedBuffer.readable()) { - LOG.debug("Processed queue - no bytes remaining"); - cnxn.queuedBuffer = null; - } else { - LOG.debug("Processed queue - bytes remaining"); - } - } else { - LOG.debug("queue empty"); - } - cnxn.channel.setReadable(true); - } else { - ChannelBuffer buf = (ChannelBuffer)e.getMessage(); + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + try { if (LOG.isTraceEnabled()) { - LOG.trace(Long.toHexString(cnxn.sessionId) - + " buf 0x" - + ChannelBuffers.hexDump(buf)); + LOG.trace("message received called {}", msg); } - - if (cnxn.throttled) { - LOG.debug("Received message while throttled"); - // we are throttled, so we need to queue - if (cnxn.queuedBuffer == null) { - LOG.debug("allocating queue"); - cnxn.queuedBuffer = dynamicBuffer(buf.readableBytes()); - } - cnxn.queuedBuffer.writeBytes(buf); - if (LOG.isTraceEnabled()) { - LOG.trace(Long.toHexString(cnxn.sessionId) - + " queuedBuffer 0x" - + ChannelBuffers.hexDump(cnxn.queuedBuffer)); + try { + if (LOG.isDebugEnabled()) { + LOG.debug("New message {} from {}", msg, ctx.channel()); } - } else { - LOG.debug("not throttled"); - if (cnxn.queuedBuffer != null) { - if (LOG.isTraceEnabled()) { - LOG.trace(Long.toHexString(cnxn.sessionId) - + " queuedBuffer 0x" - + ChannelBuffers.hexDump(cnxn.queuedBuffer)); - } - cnxn.queuedBuffer.writeBytes(buf); - if (LOG.isTraceEnabled()) { - LOG.trace(Long.toHexString(cnxn.sessionId) - + " queuedBuffer 0x" - + ChannelBuffers.hexDump(cnxn.queuedBuffer)); - } - - cnxn.receiveMessage(cnxn.queuedBuffer); - if (!cnxn.queuedBuffer.readable()) { - LOG.debug("Processed queue - no bytes remaining"); - cnxn.queuedBuffer = null; - } else { - LOG.debug("Processed queue - bytes remaining"); - } + NettyServerCnxn cnxn = ctx.channel().attr(CONNECTION_ATTRIBUTE).get(); + if (cnxn == null) { + LOG.error("channelRead() on a closed or closing NettyServerCnxn"); } else { - cnxn.receiveMessage(buf); - if (buf.readable()) { - if (LOG.isTraceEnabled()) { - LOG.trace("Before copy " + buf); - } - cnxn.queuedBuffer = dynamicBuffer(buf.readableBytes()); - cnxn.queuedBuffer.writeBytes(buf); - if (LOG.isTraceEnabled()) { - LOG.trace("Copy is " + cnxn.queuedBuffer); - LOG.trace(Long.toHexString(cnxn.sessionId) - + " queuedBuffer 0x" - + ChannelBuffers.hexDump(cnxn.queuedBuffer)); - } - } + cnxn.processMessage((ByteBuf) msg); } + } catch (Exception ex) { + LOG.error("Unexpected exception in receive", ex); + throw ex; } + } finally { + ReferenceCountUtil.release(msg); } } @Override - public void writeComplete(ChannelHandlerContext ctx, - WriteCompletionEvent e) throws Exception - { + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { if (LOG.isTraceEnabled()) { - LOG.trace("write complete " + e); + promise.addListener((future) -> { + LOG.trace("write {}", + future.isSuccess() ? "complete" : "failed"); + }); } + super.write(ctx, msg, promise); } - private final class CertificateVerifier - implements ChannelFutureListener { + private final class CertificateVerifier implements GenericFutureListener<Future<Channel>> { private final SslHandler sslHandler; private final NettyServerCnxn cnxn; @@ -291,12 +225,13 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory { /** * Only allow the connection to stay open if certificate passes auth */ - public void operationComplete(ChannelFuture future) - throws SSLPeerUnverifiedException { + public void operationComplete(Future<Channel> future) throws SSLPeerUnverifiedException { if (future.isSuccess()) { - LOG.debug("Successful handshake with session 0x{}", - Long.toHexString(cnxn.sessionId)); - SSLEngine eng = sslHandler.getEngine(); + if (LOG.isDebugEnabled()) { + LOG.debug("Successful handshake with session 0x{}", + Long.toHexString(cnxn.getSessionId())); + } + SSLEngine eng = sslHandler.engine(); SSLSession session = eng.getSession(); cnxn.setClientCertificateChain(session.getPeerCertificates()); @@ -316,16 +251,17 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory { if (KeeperException.Code.OK != authProvider.handleAuthentication(cnxn, null)) { LOG.error("Authentication failed for session 0x{}", - Long.toHexString(cnxn.sessionId)); + Long.toHexString(cnxn.getSessionId())); cnxn.close(); return; } - allChannels.add(future.getChannel()); + final Channel futureChannel = future.getNow(); + allChannels.add(Objects.requireNonNull(futureChannel)); addCnxn(cnxn); } else { LOG.error("Unsuccessful handshake with session 0x{}", - Long.toHexString(cnxn.sessionId)); + Long.toHexString(cnxn.getSessionId())); cnxn.close(); } } @@ -334,30 +270,42 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory { CnxnChannelHandler channelHandler = new CnxnChannelHandler(); - NettyServerCnxnFactory() { - bootstrap = new ServerBootstrap( - new NioServerSocketChannelFactory( - Executors.newCachedThreadPool(), - Executors.newCachedThreadPool())); - // parent channel - bootstrap.setOption("reuseAddress", true); - // child channels - bootstrap.setOption("child.tcpNoDelay", true); - /* set socket linger to off, so that socket close does not block */ - bootstrap.setOption("child.soLinger", -1); - bootstrap.setPipelineFactory(new ChannelPipelineFactory() { - @Override - public ChannelPipeline getPipeline() throws Exception { - ChannelPipeline p = Channels.pipeline(); - if (secure) { - initSSL(p); - } - p.addLast("servercnxnfactory", channelHandler); + private ServerBootstrap configureBootstrapAllocator(ServerBootstrap bootstrap) { + ByteBufAllocator testAllocator = TEST_ALLOCATOR.get(); + if (testAllocator != null) { + return bootstrap + .option(ChannelOption.ALLOCATOR, testAllocator) + .childOption(ChannelOption.ALLOCATOR, testAllocator); + } else { + return bootstrap; + } + } - return p; - } - }); + NettyServerCnxnFactory() { x509Util = new ClientX509Util(); + + EventLoopGroup bossGroup = NettyUtils.newNioOrEpollEventLoopGroup(); + EventLoopGroup workerGroup = NettyUtils.newNioOrEpollEventLoopGroup(); + ServerBootstrap bootstrap = new ServerBootstrap() + .group(bossGroup, workerGroup) + .channel(NettyUtils.nioOrEpollServerSocketChannel()) + // parent channel options + .option(ChannelOption.SO_REUSEADDR, true) + // child channels options + .childOption(ChannelOption.TCP_NODELAY, true) + .childOption(ChannelOption.SO_LINGER, -1) + .childHandler(new ChannelInitializer<SocketChannel>() { + @Override + protected void initChannel(SocketChannel ch) throws Exception { + ChannelPipeline pipeline = ch.pipeline(); + if (secure) { + initSSL(pipeline); + } + pipeline.addLast("servercnxnfactory", channelHandler); + } + }); + this.bootstrap = configureBootstrapAllocator(bootstrap); + this.bootstrap.validate(); } private synchronized void initSSL(ChannelPipeline p) @@ -390,7 +338,7 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory { sslEngine.setNeedClientAuth(true); p.addLast("ssl", new SslHandler(sslEngine)); - LOG.info("SSL handler added for channel: {}", p.getChannel()); + LOG.info("SSL handler added for channel: {}", p.channel()); } @Override @@ -440,7 +388,7 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory { return localAddress.getPort(); } - boolean killed; + private boolean killed; // use synchronized(this) to access @Override public void join() throws InterruptedException { synchronized(this) { @@ -452,16 +400,42 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory { @Override public void shutdown() { - LOG.info("shutdown called " + localAddress); + synchronized (this) { + if (killed) { + LOG.info("already shutdown {}", localAddress); + return; + } + } + LOG.info("shutdown called {}", localAddress); + if (login != null) { login.shutdown(); } + + final EventLoopGroup bossGroup = bootstrap.config().group(); + final EventLoopGroup workerGroup = bootstrap.config().childGroup(); // null if factory never started if (parentChannel != null) { - parentChannel.close().awaitUninterruptibly(); + ChannelFuture parentCloseFuture = parentChannel.close(); + if (bossGroup != null) { + parentCloseFuture.addListener(future -> { + bossGroup.shutdownGracefully(); + }); + } closeAll(); - allChannels.close().awaitUninterruptibly(); - bootstrap.releaseExternalResources(); + ChannelGroupFuture allChannelsCloseFuture = allChannels.close(); + if (workerGroup != null) { + allChannelsCloseFuture.addListener(future -> { + workerGroup.shutdownGracefully(); + }); + } + } else { + if (bossGroup != null) { + bossGroup.shutdownGracefully(); + } + if (workerGroup != null) { + workerGroup.shutdownGracefully(); + } } if (zkServer != null) { @@ -475,16 +449,23 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory { @Override public void start() { - LOG.info("binding to port " + localAddress); - parentChannel = bootstrap.bind(localAddress); + LOG.info("binding to port {}", localAddress); + parentChannel = bootstrap.bind(localAddress).syncUninterruptibly().channel(); + // Port changes after bind() if the original port was 0, update + // localAddress to get the real port. + localAddress = (InetSocketAddress) parentChannel.localAddress(); + LOG.info("bound to port " + getLocalPort()); } public void reconfigure(InetSocketAddress addr) { Channel oldChannel = parentChannel; try { LOG.info("binding to port {}", addr); - parentChannel = bootstrap.bind(addr); - localAddress = addr; + parentChannel = bootstrap.bind(addr).syncUninterruptibly().channel(); + // Port changes after bind() if the original port was 0, update + // localAddress to get the real port. + localAddress = (InetSocketAddress) parentChannel.localAddress(); + LOG.info("bound to port " + getLocalPort()); } catch (Exception e) { LOG.error("Error while reconfiguring", e); } finally { @@ -517,21 +498,39 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory { cnxns.add(cnxn); synchronized (ipMap){ InetAddress addr = - ((InetSocketAddress)cnxn.channel.getRemoteAddress()) - .getAddress(); + ((InetSocketAddress)cnxn.getChannel().remoteAddress()).getAddress(); Set<NettyServerCnxn> s = ipMap.get(addr); if (s == null) { - s = new HashSet<NettyServerCnxn>(); + s = new HashSet<>(); + ipMap.put(addr, s); } s.add(cnxn); - ipMap.put(addr,s); + } + } + + void removeCnxnFromIpMap(NettyServerCnxn cnxn, InetAddress remoteAddress) { + synchronized (ipMap) { + Set<NettyServerCnxn> s = ipMap.get(remoteAddress); + if (s != null) { + s.remove(cnxn); + if (s.isEmpty()) { + ipMap.remove(remoteAddress); + } + } else { + LOG.error( + "Unexpected null set for remote address {} when removing cnxn {}", + remoteAddress, + cnxn); + } } } private int getClientCnxnCount(InetAddress addr) { - Set<NettyServerCnxn> s = ipMap.get(addr); - if (s == null) return 0; - return s.size(); + synchronized (ipMap) { + Set<NettyServerCnxn> s = ipMap.get(addr); + if (s == null) return 0; + return s.size(); + } } @Override @@ -552,4 +551,23 @@ public class NettyServerCnxnFactory extends ServerCnxnFactory { return info; } + /** + * Sets the test ByteBufAllocator. This allocator will be used by all + * future instances of this class. + * It is not recommended to use this method outside of testing. + * @param allocator the ByteBufAllocator to use for all netty buffer + * allocations. + */ + static void setTestAllocator(ByteBufAllocator allocator) { + TEST_ALLOCATOR.set(allocator); + } + + /** + * Clears the test ByteBufAllocator. The default allocator will be used + * by all future instances of this class. + * It is not recommended to use this method outside of testing. + */ + static void clearTestAllocator() { + TEST_ALLOCATOR.set(null); + } } http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/UnifiedServerSocket.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/UnifiedServerSocket.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/UnifiedServerSocket.java index 4802ecf..d1e3ba5 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/UnifiedServerSocket.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/UnifiedServerSocket.java @@ -18,10 +18,10 @@ package org.apache.zookeeper.server.quorum; +import io.netty.buffer.Unpooled; +import io.netty.handler.ssl.SslHandler; import org.apache.zookeeper.common.X509Exception; import org.apache.zookeeper.common.X509Util; -import org.jboss.netty.buffer.ChannelBuffers; -import org.jboss.netty.handler.ssl.SslHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -61,7 +61,7 @@ public class UnifiedServerSocket extends ServerSocket { int bytesRead = prependableSocket.getInputStream().read(litmus, 0, 5); prependableSocket.prependToInputStream(litmus); - if (bytesRead == 5 && SslHandler.isEncrypted(ChannelBuffers.wrappedBuffer(litmus))) { + if (bytesRead == 5 && SslHandler.isEncrypted(Unpooled.wrappedBuffer(litmus))) { LOG.info(getInetAddress() + " attempting to connect over ssl"); SSLSocket sslSocket; try { http://git-wip-us.apache.org/repos/asf/zookeeper/blob/caca0627/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketTest.java ---------------------------------------------------------------------- diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketTest.java index 054e1ed..0550bcf 100644 --- a/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketTest.java +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/ClientCnxnSocketTest.java @@ -23,10 +23,23 @@ import static org.junit.Assert.fail; import java.io.IOException; import org.apache.zookeeper.client.ZKClientConfig; +import org.apache.zookeeper.test.TestByteBufAllocator; import org.apache.zookeeper.common.ZKConfig; +import org.junit.After; +import org.junit.Before; import org.junit.Test; public class ClientCnxnSocketTest { + @Before + public void setUp() { + ClientCnxnSocketNetty.setTestAllocator(TestByteBufAllocator.getInstance()); + } + + @After + public void tearDown() { + ClientCnxnSocketNetty.clearTestAllocator(); + TestByteBufAllocator.checkForLeaks(); + } @Test public void testWhenInvalidJuteMaxBufferIsConfiguredIOExceptionIsThrown() {
