This is an automated email from the ASF dual-hosted git repository.
andor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zookeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 5375c25 ZOOKEEPER-3274: Use CompositeByteBuf to queue data in
NettyServerCnxn
5375c25 is described below
commit 5375c2542977223ff9beaeb62d6ee5aed783118e
Author: Ilya Maykov <[email protected]>
AuthorDate: Mon Feb 18 17:05:51 2019 +0100
ZOOKEEPER-3274: Use CompositeByteBuf to queue data in NettyServerCnxn
This avoids unnecessary buffer copies and resizes.
Author: Ilya Maykov <[email protected]>
Reviewers: [email protected]
Closes #810 from ivmaykov/ZOOKEEPER-3274
---
.../apache/zookeeper/server/NettyServerCnxn.java | 35 +++++++++++++++++-----
1 file changed, 27 insertions(+), 8 deletions(-)
diff --git
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
index 4b69905..261dfe1 100644
---
a/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
+++
b/zookeeper-server/src/main/java/org/apache/zookeeper/server/NettyServerCnxn.java
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
@@ -53,7 +54,7 @@ import org.slf4j.LoggerFactory;
public class NettyServerCnxn extends ServerCnxn {
private static final Logger LOG =
LoggerFactory.getLogger(NettyServerCnxn.class);
private final Channel channel;
- private ByteBuf queuedBuffer;
+ private CompositeByteBuf queuedBuffer;
private final AtomicBoolean throttled = new AtomicBoolean(false);
private ByteBuffer bb;
private final ByteBuffer bbLen = ByteBuffer.allocate(4);
@@ -292,6 +293,24 @@ public class NettyServerCnxn extends ServerCnxn {
}
/**
+ * Appends <code>buf</code> to <code>queuedBuffer</code>. Does not
duplicate <code>buf</code>
+ * or call any flavor of {@link ByteBuf#retain()}. Caller must ensure that
<code>buf</code>
+ * is not owned by anyone else, as this call transfers ownership of
<code>buf</code> to the
+ * <code>queuedBuffer</code>.
+ *
+ * This method should only be called from the event loop thread.
+ * @param buf the buffer to append to the queue.
+ */
+ private void appendToQueuedBuffer(ByteBuf buf) {
+ checkIsInEventLoop("appendToQueuedBuffer");
+ if (queuedBuffer.numComponents() == queuedBuffer.maxNumComponents()) {
+ // queuedBuffer has reached its component limit, so combine the
existing components.
+ queuedBuffer.consolidate();
+ }
+ queuedBuffer.addComponent(true, buf);
+ }
+
+ /**
* Process incoming message. This should only be called from the event
* loop thread.
* Note that this method does not call <code>buf.release()</code>. The
caller
@@ -318,9 +337,9 @@ public class NettyServerCnxn extends ServerCnxn {
// we are throttled, so we need to queue
if (queuedBuffer == null) {
LOG.debug("allocating queue");
- queuedBuffer = channel.alloc().buffer(buf.readableBytes());
+ queuedBuffer = channel.alloc().compositeBuffer();
}
- queuedBuffer.writeBytes(buf);
+ appendToQueuedBuffer(buf.retainedDuplicate());
if (LOG.isTraceEnabled()) {
LOG.trace("0x{} queuedBuffer {}",
Long.toHexString(sessionId),
@@ -329,7 +348,7 @@ public class NettyServerCnxn extends ServerCnxn {
} else {
LOG.debug("not throttled");
if (queuedBuffer != null) {
- queuedBuffer.writeBytes(buf);
+ appendToQueuedBuffer(buf.retainedDuplicate());
processQueuedBuffer();
} else {
receiveMessage(buf);
@@ -340,9 +359,9 @@ public class NettyServerCnxn extends ServerCnxn {
LOG.trace("Before copy {}", buf);
}
if (queuedBuffer == null) {
- queuedBuffer =
channel.alloc().buffer(buf.readableBytes());
+ queuedBuffer = channel.alloc().compositeBuffer();
}
- queuedBuffer.writeBytes(buf);
+ appendToQueuedBuffer(buf.retainedSlice(buf.readerIndex(),
buf.readableBytes()));
if (LOG.isTraceEnabled()) {
LOG.trace("Copy is {}", queuedBuffer);
LOG.trace("0x{} queuedBuffer {}",
@@ -375,9 +394,9 @@ public class NettyServerCnxn extends ServerCnxn {
releaseQueuedBuffer();
} else {
LOG.debug("Processed queue - bytes remaining");
- // Possibly reduce memory consumption by freeing up buffer
space
+ // Try to reduce memory consumption by freeing up buffer space
// which is no longer needed.
- queuedBuffer.discardSomeReadBytes();
+ queuedBuffer.discardReadComponents();
}
} else {
LOG.debug("queue empty");