This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 396ec12dc7 Optimize ReadResponse for small entry sizes (#3597)
396ec12dc7 is described below

commit 396ec12dc747fc21105d65f13d2dedace247d1a2
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Nov 2 10:42:51 2022 -0700

    Optimize ReadResponse for small entry sizes (#3597)
    
    * Optimize ReadResponse for small entry sizes
    
    * Fixed checkstyle
---
 .../bookkeeper/proto/BookieProtoEncoding.java      | 49 ++++++++++++++--------
 1 file changed, 32 insertions(+), 17 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
index 70962dfd63..d804a41b0e 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
@@ -28,7 +28,6 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.ByteBufInputStream;
 import io.netty.buffer.ByteBufOutputStream;
-import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandler.Sharable;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
@@ -51,6 +50,13 @@ import org.slf4j.LoggerFactory;
 public class BookieProtoEncoding {
     private static final Logger LOG = 
LoggerFactory.getLogger(BookieProtoEncoding.class);
 
+    /**
+     * Threshold under which an entry is considered to be "small".
+     *
+     * Small entries payloads are copied instead of being passed around as 
references.
+     */
+    public static final int SMALL_ENTRY_SIZE_THRESHOLD = 16 * 1024;
+
     /**
      * An encoder/decoder interface for the Bookkeeper protocol.
      */
@@ -246,38 +252,47 @@ public class BookieProtoEncoding {
                 return msg;
             }
             BookieProtocol.Response r = (BookieProtocol.Response) msg;
-            ByteBuf buf = allocator.buffer(RESPONSE_HEADERS_SIZE + 4 /* frame 
size */);
-            buf.writerIndex(4); // Leave the placeholder for the frame size
-            buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(), 
r.getOpCode(), (short) 0));
 
             try {
                 if (msg instanceof BookieProtocol.ReadResponse) {
+                    BookieProtocol.ReadResponse rr = 
(BookieProtocol.ReadResponse) r;
+                    int payloadSize = rr.getData().readableBytes();
+                    int responseSize = RESPONSE_HEADERS_SIZE + payloadSize;
+                    boolean isSmallEntry = payloadSize < 
SMALL_ENTRY_SIZE_THRESHOLD;
+
+                    int bufferSize = 4 /* frame size */ + RESPONSE_HEADERS_SIZE
+                            + (isSmallEntry ? payloadSize : 0);
+                    ByteBuf buf = allocator.buffer(bufferSize);
+                    buf.writeInt(responseSize);
+                    buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(), 
r.getOpCode(), (short) 0));
                     buf.writeInt(r.getErrorCode());
                     buf.writeLong(r.getLedgerId());
                     buf.writeLong(r.getEntryId());
 
-                    BookieProtocol.ReadResponse rr = 
(BookieProtocol.ReadResponse) r;
-                    if (rr.hasData()) {
-                        int frameSize = RESPONSE_HEADERS_SIZE + 
rr.getData().readableBytes();
-                        buf.setInt(0, frameSize);
-                        return ByteBufList.get(buf, rr.getData());
-                    } else {
-                        buf.setInt(0, RESPONSE_HEADERS_SIZE); // Frame size
+                    if (isSmallEntry) {
+                        buf.writeBytes(rr.getData());
                         return buf;
+                    } else {
+                        return ByteBufList.get(buf, rr.getData());
                     }
                 } else if (msg instanceof BookieProtocol.AddResponse) {
+                    ByteBuf buf = allocator.buffer(RESPONSE_HEADERS_SIZE + 4 
/* frame size */);
+                    buf.writeInt(RESPONSE_HEADERS_SIZE);
+                    buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(), 
r.getOpCode(), (short) 0));
                     buf.writeInt(r.getErrorCode());
                     buf.writeLong(r.getLedgerId());
                     buf.writeLong(r.getEntryId());
-                    buf.setInt(0, RESPONSE_HEADERS_SIZE); // Frame size
-
                     return buf;
                 } else if (msg instanceof BookieProtocol.AuthResponse) {
                     BookkeeperProtocol.AuthMessage am = 
((BookieProtocol.AuthResponse) r).getAuthMessage();
-                    ByteBuf payload = Unpooled.wrappedBuffer(am.toByteArray());
-                    int frameSize = 4 + payload.readableBytes();
-                    buf.setInt(0, frameSize);
-                    return ByteBufList.get(buf, payload);
+                    int payloadSize = 4 + am.getSerializedSize();
+                    int bufferSize = payloadSize + 4 /* frame size */;
+
+                    ByteBuf buf = allocator.buffer(bufferSize);
+                    buf.writeInt(payloadSize);
+                    buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(), 
r.getOpCode(), (short) 0));
+                    buf.writeBytes(am.toByteArray());
+                    return buf;
                 } else {
                     LOG.error("Cannot encode unknown response type {}", 
msg.getClass().getName());
                     return msg;

Reply via email to