This is an automated email from the ASF dual-hosted git repository.
andor pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/branch-3.5 by this push:
new a4ae9cd ZOOKEEPER-3272: Clean up netty4 code per Norman Maurer's
review comments
a4ae9cd is described below
commit a4ae9cd53b48d347d1f9f987416a4b390b560b36
Author: Ilya Maykov <[email protected]>
AuthorDate: Fri Feb 15 15:34:27 2019 +0100
ZOOKEEPER-3272: Clean up netty4 code per Norman Maurer's review comments
Netty4 code clean-up per the discussion in PR #753.
Author: Ilya Maykov <[email protected]>
Reviewers: [email protected]
Closes #819 from ivmaykov/ZOOKEEPER-3272-branch-3.5
---
.../apache/zookeeper/ClientCnxnSocketNetty.java | 43 +++++++----
.../org/apache/zookeeper/common/NettyUtils.java | 90 +++++++++++++++++++++-
.../apache/zookeeper/server/NettyServerCnxn.java | 47 ++++++++---
.../zookeeper/server/NettyServerCnxnFactory.java | 28 ++++---
.../zookeeper/server/NettyServerCnxnTest.java | 13 ++--
5 files changed, 176 insertions(+), 45 deletions(-)
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java
index c4a7301..2f000fc 100755
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/ClientCnxnSocketNetty.java
@@ -49,6 +49,8 @@ import io.netty.channel.EventLoopGroup;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.ssl.SslHandler;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
import org.apache.zookeeper.ClientCnxn.EndOfStreamException;
import org.apache.zookeeper.ClientCnxn.Packet;
import org.apache.zookeeper.client.ZKClientConfig;
@@ -82,7 +84,9 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket {
ClientCnxnSocketNetty(ZKClientConfig clientConfig) throws IOException {
this.clientConfig = clientConfig;
- eventLoopGroup = NettyUtils.newNioOrEpollEventLoopGroup();
+ // Client only has 1 outgoing socket, so the event loop group only
needs
+ // a single thread.
+ eventLoopGroup = NettyUtils.newNioOrEpollEventLoopGroup(1 /* nThreads
*/);
initProperties();
}
@@ -143,6 +147,7 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket
{
@Override
public void operationComplete(ChannelFuture channelFuture)
throws Exception {
// this lock guarantees that channel won't be assigned
after cleanup().
+ boolean connected = false;
connectLock.lock();
try {
if (!channelFuture.isSuccess()) {
@@ -175,10 +180,13 @@ public class ClientCnxnSocketNetty extends
ClientCnxnSocket {
} else {
needSasl.set(false);
}
- LOG.info("channel is connected: {}",
channelFuture.channel());
+ connected = true;
} finally {
connectFuture = null;
connectLock.unlock();
+ if (connected) {
+ LOG.info("channel is connected: {}",
channelFuture.channel());
+ }
// need to wake on connect success or failure to avoid
// timing out ClientCnxn.SendThread which may be
// blocked waiting for first connect in doTransport().
@@ -218,9 +226,7 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket
{
@Override
void close() {
- if (!eventLoopGroup.isShuttingDown()) {
- eventLoopGroup.shutdownGracefully();
- }
+ eventLoopGroup.shutdownGracefully();
}
@Override
@@ -318,20 +324,23 @@ public class ClientCnxnSocketNetty extends
ClientCnxnSocket {
return sendPkt(p, false);
}
+ // Use a single listener instance to reduce GC
+ private final GenericFutureListener<Future<Void>> onSendPktDoneListener =
f -> {
+ if (f.isSuccess()) {
+ sentCount.getAndIncrement();
+ }
+ };
+
private ChannelFuture sendPkt(Packet p, boolean doFlush) {
// Assuming the packet will be sent out successfully. Because if it
fails,
// the channel will close and clean up queues.
p.createBB();
updateLastSend();
- ChannelFuture result = channel.write(Unpooled.wrappedBuffer(p.bb));
- result.addListener(f -> {
- if (f.isSuccess()) {
- sentCount.getAndIncrement();
- }
- });
- if (doFlush) {
- channel.flush();
- }
+ final ByteBuf writeBuffer = Unpooled.wrappedBuffer(p.bb);
+ final ChannelFuture result = doFlush
+ ? channel.writeAndFlush(writeBuffer)
+ : channel.write(writeBuffer);
+ result.addListener(onSendPktDoneListener);
return result;
}
@@ -345,6 +354,7 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket
{
*/
private void doWrite(List<Packet> pendingQueue, Packet p, ClientCnxn cnxn)
{
updateNow();
+ boolean anyPacketsSent = false;
while (true) {
if (p != WakeupPacket.getInstance()) {
if ((p.requestHeader != null) &&
@@ -356,6 +366,7 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket
{
}
}
sendPktOnly(p);
+ anyPacketsSent = true;
}
if (outgoingQueue.isEmpty()) {
break;
@@ -364,7 +375,9 @@ public class ClientCnxnSocketNetty extends ClientCnxnSocket
{
}
// TODO: maybe we should flush in the loop above every N packets/bytes?
// But, how do we determine the right value for N ...
- channel.flush();
+ if (anyPacketsSent) {
+ channel.flush();
+ }
}
@Override
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/common/NettyUtils.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/common/NettyUtils.java
index 5883296..283ea87 100644
--- a/zookeeper-server/src/main/java/org/apache/zookeeper/common/NettyUtils.java
+++ b/zookeeper-server/src/main/java/org/apache/zookeeper/common/NettyUtils.java
@@ -18,6 +18,15 @@
package org.apache.zookeeper.common;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.Set;
+
import io.netty.channel.EventLoopGroup;
import io.netty.channel.epoll.Epoll;
import io.netty.channel.epoll.EpollEventLoopGroup;
@@ -28,15 +37,22 @@ import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Helper methods for netty code.
*/
public class NettyUtils {
+ private static final Logger LOG =
LoggerFactory.getLogger(NettyUtils.class);
+
+ private static final int DEFAULT_INET_ADDRESS_COUNT = 1;
+
/**
* If {@link Epoll#isAvailable()} <code>== true</code>, returns a new
* {@link EpollEventLoopGroup}, otherwise returns a new
- * {@link NioEventLoopGroup}.
+ * {@link NioEventLoopGroup}. Creates the event loop group using the
+ * default number of threads.
* @return a new {@link EventLoopGroup}.
*/
public static EventLoopGroup newNioOrEpollEventLoopGroup() {
@@ -48,6 +64,22 @@ public class NettyUtils {
}
/**
+ * If {@link Epoll#isAvailable()} <code>== true</code>, returns a new
+ * {@link EpollEventLoopGroup}, otherwise returns a new
+ * {@link NioEventLoopGroup}. Creates the event loop group using the
+ * specified number of threads instead of the default.
+ * @param nThreads see {@link NioEventLoopGroup#NioEventLoopGroup(int)}.
+ * @return a new {@link EventLoopGroup}.
+ */
+ public static EventLoopGroup newNioOrEpollEventLoopGroup(int nThreads) {
+ if (Epoll.isAvailable()) {
+ return new EpollEventLoopGroup(nThreads);
+ } else {
+ return new NioEventLoopGroup(nThreads);
+ }
+ }
+
+ /**
* If {@link Epoll#isAvailable()} <code>== true</code>, returns
* {@link EpollSocketChannel}, otherwise returns {@link NioSocketChannel}.
* @return a socket channel class.
@@ -73,4 +105,60 @@ public class NettyUtils {
return NioServerSocketChannel.class;
}
}
+
+ /**
+ * Attempts to detect and return the number of local network addresses
that could be
+ * used by a client to reach this server. This means we exclude the
following address types:
+ * <ul>
+ * <li>Multicast addresses. Zookeeper server sockets use TCP, thus
cannot bind to a multicast address.</li>
+ * <li>Link-local addresses. Routers don't forward traffic sent to a
link-local address, so
+ * any realistic server deployment would not have clients using
these.</li>
+ * <li>Loopback addresses. These are typically only used for
testing.</li>
+ * </ul>
+ * Any remaining addresses are counted, and the total count is returned.
This number is
+ * used to configure the number of threads for the "boss" event loop
group, to make sure we have
+ * enough threads for each address in case the server is configured to
listen on
+ * all available addresses.
+ * If listing the network interfaces fails, this method will return 1.
+ *
+ * @return the number of client-reachable local network addresses found, or
+ * 1 if listing the network interfaces fails.
+ */
+ public static int getClientReachableLocalInetAddressCount() {
+ try {
+ Set<InetAddress> validInetAddresses = new HashSet<>();
+ Enumeration<NetworkInterface> allNetworkInterfaces =
NetworkInterface.getNetworkInterfaces();
+ for (NetworkInterface networkInterface :
Collections.list(allNetworkInterfaces)) {
+ for (InetAddress inetAddress :
Collections.list(networkInterface.getInetAddresses())) {
+ if (inetAddress.isLinkLocalAddress()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ignoring link-local InetAddress {}",
inetAddress);
+ }
+ continue;
+ }
+ if (inetAddress.isMulticastAddress()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ignoring multicast InetAddress {}",
inetAddress);
+ }
+ continue;
+ }
+ if (inetAddress.isLoopbackAddress()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Ignoring loopback InetAddress {}",
inetAddress);
+ }
+ continue;
+ }
+ validInetAddresses.add(inetAddress);
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Detected {} local network addresses",
validInetAddresses.size());
+ LOG.debug("Resolved local addresses are: {}",
Arrays.toString(validInetAddresses.toArray()));
+ }
+ return validInetAddresses.size() > 0 ? validInetAddresses.size() :
DEFAULT_INET_ADDRESS_COUNT;
+ } catch (SocketException ex) {
+ LOG.warn("Failed to list all network interfaces, assuming 1", ex);
+ return DEFAULT_INET_ADDRESS_COUNT;
+ }
+ }
}
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
index 86adbe6..423fc7f 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
@@ -36,7 +36,8 @@ import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
-import io.netty.util.ReferenceCountUtil;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
import org.apache.jute.BinaryInputArchive;
import org.apache.jute.Record;
import org.apache.zookeeper.WatchedEvent;
@@ -183,17 +184,20 @@ public class NettyServerCnxn extends ServerCnxn {
this.sessionId = sessionId;
}
+ // Use a single listener instance to reduce GC
+ private final GenericFutureListener<Future<Void>> onSendBufferDoneListener
= f -> {
+ if (f.isSuccess()) {
+ packetSent();
+ }
+ };
+
@Override
public void sendBuffer(ByteBuffer sendBuffer) {
if (sendBuffer == ServerCnxnFactory.closeConn) {
close();
return;
}
-
channel.writeAndFlush(Unpooled.wrappedBuffer(sendBuffer)).addListener(f -> {
- if (f.isSuccess()) {
- packetSent();
- }
- });
+
channel.writeAndFlush(Unpooled.wrappedBuffer(sendBuffer)).addListener(onSendBufferDoneListener);
}
/**
@@ -282,12 +286,27 @@ public class NettyServerCnxn extends ServerCnxn {
}
/**
+ * Helper that throws an IllegalStateException if the current thread is not
+ * executing in the channel's event loop thread.
+ * @param callerMethodName the name of the calling method to add to the
exception message.
+ */
+ private void checkIsInEventLoop(String callerMethodName) {
+ if (!channel.eventLoop().inEventLoop()) {
+ throw new IllegalStateException(
+ callerMethodName + "() called from non-EventLoop thread");
+ }
+ }
+
+ /**
* Process incoming message. This should only be called from the event
* loop thread.
+ * Note that this method does not call <code>buf.release()</code>. The
caller
+ * is responsible for making sure the buf is released after this method
+ * returns.
* @param buf the message bytes to process.
*/
void processMessage(ByteBuf buf) {
- assert channel.eventLoop().inEventLoop();
+ checkIsInEventLoop("processMessage");
if (LOG.isDebugEnabled()) {
LOG.debug("0x{} queuedBuffer: {}",
Long.toHexString(sessionId),
@@ -346,7 +365,7 @@ public class NettyServerCnxn extends ServerCnxn {
* from the event loop thread.
*/
void processQueuedBuffer() {
- assert channel.eventLoop().inEventLoop();
+ checkIsInEventLoop("processQueuedBuffer");
if (queuedBuffer != null) {
if (LOG.isTraceEnabled()) {
LOG.trace("processing queue 0x{} queuedBuffer {}",
@@ -362,6 +381,9 @@ public class NettyServerCnxn extends ServerCnxn {
releaseQueuedBuffer();
} else {
LOG.debug("Processed queue - bytes remaining");
+ // Possibly reduce memory consumption by freeing up buffer
space
+ // which is no longer needed.
+ queuedBuffer.discardSomeReadBytes();
}
} else {
LOG.debug("queue empty");
@@ -373,9 +395,9 @@ public class NettyServerCnxn extends ServerCnxn {
* called from the event loop thread.
*/
private void releaseQueuedBuffer() {
- assert channel.eventLoop().inEventLoop();
+ checkIsInEventLoop("releaseQueuedBuffer");
if (queuedBuffer != null) {
- ReferenceCountUtil.release(queuedBuffer);
+ queuedBuffer.release();
queuedBuffer = null;
}
}
@@ -384,10 +406,13 @@ public class NettyServerCnxn extends ServerCnxn {
* Receive a message, which can come from the queued buffer or from a new
* buffer coming in over the channel. This should only be called from the
* event loop thread.
+ * Note that this method does not call <code>message.release()</code>. The
+ * caller is responsible for making sure the message is released after this
+ * method returns.
* @param message the message bytes to process.
*/
private void receiveMessage(ByteBuf message) {
- assert channel.eventLoop().inEventLoop();
+ checkIsInEventLoop("receiveMessage");
try {
while(message.isReadable() && !throttled.get()) {
if (bb != null) {
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java
index c28561f..382a8bf 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxnFactory.java
@@ -192,14 +192,16 @@ public class NettyServerCnxnFactory extends
ServerCnxnFactory {
}
}
- @Override
- public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) throws Exception {
+ // Use a single listener instance to reduce GC
+ private final GenericFutureListener<Future<Void>>
onWriteCompletedListener = (f) -> {
if (LOG.isTraceEnabled()) {
- promise.addListener((future) -> {
- LOG.trace("write {}",
- future.isSuccess() ? "complete" : "failed");
- });
+ LOG.trace("write {}", f.isSuccess() ? "complete" : "failed");
}
+ };
+
+ @Override
+ public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) throws Exception {
+ promise.addListener(onWriteCompletedListener);
super.write(ctx, msg, promise);
}
@@ -274,7 +276,8 @@ public class NettyServerCnxnFactory extends
ServerCnxnFactory {
NettyServerCnxnFactory() {
x509Util = new ClientX509Util();
- EventLoopGroup bossGroup = NettyUtils.newNioOrEpollEventLoopGroup();
+ EventLoopGroup bossGroup = NettyUtils.newNioOrEpollEventLoopGroup(
+ NettyUtils.getClientReachableLocalInetAddressCount());
EventLoopGroup workerGroup = NettyUtils.newNioOrEpollEventLoopGroup();
ServerBootstrap bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
@@ -526,13 +529,14 @@ public class NettyServerCnxnFactory extends
ServerCnxnFactory {
if (s.isEmpty()) {
ipMap.remove(remoteAddress);
}
- } else {
- LOG.error(
- "Unexpected null set for remote address {} when
removing cnxn {}",
- remoteAddress,
- cnxn);
+ return;
}
}
+ // Fallthrough and log errors outside the synchronized block
+ LOG.error(
+ "Unexpected null set for remote address {} when removing cnxn
{}",
+ remoteAddress,
+ cnxn);
}
@Override
diff --git
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnTest.java
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnTest.java
index 7c51a12..fd34804 100644
---
a/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnTest.java
+++
b/zookeeper-server/src/test/java/org/apache/zookeeper/server/NettyServerCnxnTest.java
@@ -32,6 +32,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.Matchers.equalTo;
@@ -81,7 +82,7 @@ public class NettyServerCnxnTest extends ClientBase {
final String path = "/a";
try {
// make sure zkclient works
- zk.create(path, "test".getBytes(), Ids.OPEN_ACL_UNSAFE,
+ zk.create(path, "test".getBytes(StandardCharsets.UTF_8),
Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
// set on watch
Assert.assertNotNull("Didn't create znode:" + path,
@@ -116,14 +117,14 @@ public class NettyServerCnxnTest extends ClientBase {
assertThat("Last client response size should be initialized with
INIT_VALUE",
clientResponseStats.getLastBufferSize(),
equalTo(BufferStats.INIT_VALUE));
- zk.create("/a", "test".getBytes(), Ids.OPEN_ACL_UNSAFE,
+ zk.create("/a", "test".getBytes(StandardCharsets.UTF_8),
Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
assertThat("Last client response size should be greater than 0
after client request was performed",
clientResponseStats.getLastBufferSize(), greaterThan(0));
byte[] contents = zk.getData("/a", null, null);
- assertArrayEquals("unexpected data", "test".getBytes(), contents);
+ assertArrayEquals("unexpected data",
"test".getBytes(StandardCharsets.UTF_8), contents);
}
}
@@ -134,7 +135,7 @@ public class NettyServerCnxnTest extends ClientBase {
assertThat("Last client response size should be initialized with
INIT_VALUE",
clientResponseStats.getLastBufferSize(),
equalTo(BufferStats.INIT_VALUE));
- zk.create("/a", "test".getBytes(), Ids.OPEN_ACL_UNSAFE,
+ zk.create("/a", "test".getBytes(StandardCharsets.UTF_8),
Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
assertThat("Last client response size should be greater than 0
after client request was performed",
@@ -162,7 +163,7 @@ public class NettyServerCnxnTest extends ClientBase {
}
byte[] contents = zk.getData("/a", null, null);
- assertArrayEquals("unexpected data", "test".getBytes(), contents);
+ assertArrayEquals("unexpected data",
"test".getBytes(StandardCharsets.UTF_8), contents);
// As above, but don't do the throttled read. Make the request
bytes wait in the socket
// input buffer until after throttling is turned off. Need to make
sure both modes work.
@@ -180,7 +181,7 @@ public class NettyServerCnxnTest extends ClientBase {
}
contents = zk.getData("/a", null, null);
- assertArrayEquals("unexpected data", "test".getBytes(), contents);
+ assertArrayEquals("unexpected data",
"test".getBytes(StandardCharsets.UTF_8), contents);
}
}
}