This is an automated email from the ASF dual-hosted git repository.

yong pushed a commit to branch branch-4.15
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git

commit ed55d4184ee949e2a56459ac4aacf05410d3ac43
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Oct 31 21:05:42 2022 -0700

    Avoid extra buffer to prepend frame size (#3560)
    
    * Avoid extra buffer to prepend frame size
    
    * Fixed checkstyle
    
    * Fixed touch methods on ReadResponse
    
    * Fixed frame size for protobuf requests
    
    * Removed unwanted changes
    
    * Fixed AuthResponse in v2 protocol
    
    * Fixed test
    
    (cherry picked from commit 02be9d2ee28cd7c4a1895fc8494db96381484a20)
---
 .../apache/bookkeeper/proto/BookieNettyServer.java |  6 +---
 .../bookkeeper/proto/BookieProtoEncoding.java      | 42 ++++++++++++++++------
 .../apache/bookkeeper/proto/BookieProtocol.java    |  9 +++--
 .../bookkeeper/proto/PerChannelBookieClient.java   |  6 ++--
 .../org/apache/bookkeeper/util/ByteBufList.java    | 24 +------------
 .../bookkeeper/proto/BookieProtoEncodingTest.java  |  6 ++--
 6 files changed, 44 insertions(+), 49 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
index b301bd91ba..7b70a12f3a 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
@@ -49,7 +49,6 @@ import io.netty.channel.local.LocalServerChannel;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import io.netty.handler.codec.LengthFieldPrepender;
 import io.netty.handler.flush.FlushConsolidationHandler;
 import io.netty.handler.ssl.SslHandler;
 import io.netty.util.concurrent.DefaultThreadFactory;
@@ -332,11 +331,9 @@ class BookieNettyServer {
 
                     pipeline.addLast("consolidation", new 
FlushConsolidationHandler(1024, true));
 
-                    // For ByteBufList, skip the usual LengthFieldPrepender 
and have the encoder itself to add it
-                    pipeline.addLast("bytebufList", 
ByteBufList.ENCODER_WITH_SIZE);
+                    pipeline.addLast("bytebufList", ByteBufList.ENCODER);
 
                     pipeline.addLast("lengthbaseddecoder", new 
LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4));
-                    pipeline.addLast("lengthprepender", new 
LengthFieldPrepender(4));
 
                     pipeline.addLast("bookieProtoDecoder", new 
BookieProtoEncoding.RequestDecoder(registry));
                     pipeline.addLast("bookieProtoEncoder", new 
BookieProtoEncoding.ResponseEncoder(registry));
@@ -401,7 +398,6 @@ class BookieNettyServer {
                     ChannelPipeline pipeline = ch.pipeline();
 
                     pipeline.addLast("lengthbaseddecoder", new 
LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4));
-                    pipeline.addLast("lengthprepender", new 
LengthFieldPrepender(4));
 
                     pipeline.addLast("bookieProtoDecoder", new 
BookieProtoEncoding.RequestDecoder(registry));
                     pipeline.addLast("bookieProtoEncoder", new 
BookieProtoEncoding.ResponseEncoder(registry));
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
index 50fdd2f81c..860fb6635f 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
@@ -109,12 +109,17 @@ public class BookieProtoEncoding {
             BookieProtocol.Request r = (BookieProtocol.Request) msg;
             if (r instanceof BookieProtocol.AddRequest) {
                 BookieProtocol.AddRequest ar = (BookieProtocol.AddRequest) r;
-                int totalHeaderSize = 4 // for the header
-                    + BookieProtocol.MASTER_KEY_LENGTH; // for the master key
-                ByteBuf buf = allocator.buffer(totalHeaderSize);
+                ByteBufList data = ar.getData();
+
+                int totalHeaderSize = 4 // for the request header
+                        + BookieProtocol.MASTER_KEY_LENGTH; // for the master 
key
+
+                int totalPayloadSize = totalHeaderSize + data.readableBytes();
+                ByteBuf buf = allocator.buffer(totalHeaderSize + 4 /* frame 
size */);
+                buf.writeInt(totalPayloadSize); // Frame header
                 buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(), 
r.getOpCode(), r.getFlags()));
                 buf.writeBytes(r.getMasterKey(), 0, 
BookieProtocol.MASTER_KEY_LENGTH);
-                ByteBufList data = ar.getData();
+
                 ar.recycle();
                 data.prepend(buf);
                 return data;
@@ -126,7 +131,8 @@ public class BookieProtoEncoding {
                     totalHeaderSize += BookieProtocol.MASTER_KEY_LENGTH;
                 }
 
-                ByteBuf buf = allocator.buffer(totalHeaderSize);
+                ByteBuf buf = allocator.buffer(totalHeaderSize + 4 /* frame 
size */);
+                buf.writeInt(totalHeaderSize);
                 buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(), 
r.getOpCode(), r.getFlags()));
                 buf.writeLong(r.getLedgerId());
                 buf.writeLong(r.getEntryId());
@@ -139,7 +145,8 @@ public class BookieProtoEncoding {
                 BookkeeperProtocol.AuthMessage am = 
((BookieProtocol.AuthRequest) r).getAuthMessage();
                 int totalHeaderSize = 4; // for request type
                 int totalSize = totalHeaderSize + am.getSerializedSize();
-                ByteBuf buf = allocator.buffer(totalSize);
+                ByteBuf buf = allocator.buffer(totalSize + 4 /* frame size */);
+                buf.writeInt(totalSize);
                 buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(), 
r.getOpCode(), r.getFlags()));
                 ByteBufOutputStream bufStream = new ByteBufOutputStream(buf);
                 am.writeTo(bufStream);
@@ -233,6 +240,8 @@ public class BookieProtoEncoding {
             this.extensionRegistry = extensionRegistry;
         }
 
+        private static final int RESPONSE_HEADERS_SIZE = 24;
+
         @Override
         public Object encode(Object msg, ByteBufAllocator allocator)
                 throws Exception {
@@ -240,7 +249,8 @@ public class BookieProtoEncoding {
                 return msg;
             }
             BookieProtocol.Response r = (BookieProtocol.Response) msg;
-            ByteBuf buf = allocator.buffer(24);
+            ByteBuf buf = allocator.buffer(RESPONSE_HEADERS_SIZE + 4 /* frame 
size */);
+            buf.writerIndex(4); // Leave the placeholder for the frame size
             buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(), 
r.getOpCode(), (short) 0));
 
             try {
@@ -251,19 +261,26 @@ public class BookieProtoEncoding {
 
                     BookieProtocol.ReadResponse rr = 
(BookieProtocol.ReadResponse) r;
                     if (rr.hasData()) {
+                        int frameSize = RESPONSE_HEADERS_SIZE + 
rr.getData().readableBytes();
+                        buf.setInt(0, frameSize);
                         return ByteBufList.get(buf, rr.getData());
                     } else {
+                        buf.setInt(0, RESPONSE_HEADERS_SIZE); // Frame size
                         return buf;
                     }
                 } else if (msg instanceof BookieProtocol.AddResponse) {
                     buf.writeInt(r.getErrorCode());
                     buf.writeLong(r.getLedgerId());
                     buf.writeLong(r.getEntryId());
+                    buf.setInt(0, RESPONSE_HEADERS_SIZE); // Frame size
 
                     return buf;
                 } else if (msg instanceof BookieProtocol.AuthResponse) {
                     BookkeeperProtocol.AuthMessage am = 
((BookieProtocol.AuthResponse) r).getAuthMessage();
-                    return ByteBufList.get(buf, 
Unpooled.wrappedBuffer(am.toByteArray()));
+                    ByteBuf payload = Unpooled.wrappedBuffer(am.toByteArray());
+                    int frameSize = 4 + payload.readableBytes();
+                    buf.setInt(0, frameSize);
+                    return ByteBufList.get(buf, payload);
                 } else {
                     LOG.error("Cannot encode unknown response type {}", 
msg.getClass().getName());
                     return msg;
@@ -356,6 +373,8 @@ public class BookieProtoEncoding {
 
     private static ByteBuf serializeProtobuf(MessageLite msg, ByteBufAllocator 
allocator) {
         int size = msg.getSerializedSize();
+        int frameSize = size + 4;
+
         // Protobuf serialization is the last step of the netty pipeline. We 
used to allocate
         // a heap buffer while serializing and pass it down to netty library.
         // In AbstractChannel#filterOutboundMessage(), netty copies that data 
to a direct buffer if
@@ -363,17 +382,18 @@ public class BookieProtoEncoding {
         // Allocating a direct buffer reducing unncessary CPU cycles for 
buffer copies in BK client
         // and also helps alleviate pressure off the GC, since there is less 
memory churn.
         // Bookies aren't usually CPU bound. This change improves READ_ENTRY 
code paths by a small factor as well.
-        ByteBuf buf = allocator.directBuffer(size, size);
+        ByteBuf buf = allocator.directBuffer(frameSize, frameSize);
+        buf.writeInt(size);
 
         try {
-            
msg.writeTo(CodedOutputStream.newInstance(buf.nioBuffer(buf.readerIndex(), 
size)));
+            
msg.writeTo(CodedOutputStream.newInstance(buf.nioBuffer(buf.writerIndex(), 
size)));
         } catch (IOException e) {
             // This is in-memory serialization, should not fail
             throw new RuntimeException(e);
         }
 
         // Advance writer idx
-        buf.writerIndex(buf.capacity());
+        buf.writerIndex(frameSize);
         return buf;
     }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
index 965f3fd1bb..c3b09b856d 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
@@ -471,7 +471,8 @@ public interface BookieProtocol {
 
         @Override
         public ReferenceCounted retain() {
-            return data.retain();
+            data.retain();
+            return this;
         }
 
         @Override
@@ -481,12 +482,14 @@ public interface BookieProtocol {
 
         @Override
         public ReferenceCounted touch() {
-            return data.touch();
+            data.touch();
+            return this;
         }
 
         @Override
         public ReferenceCounted touch(Object hint) {
-            return data.touch(hint);
+            data.touch(hint);
+            return this;
         }
 
         @Override
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 1b1092bd39..2cda0d243b 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -52,7 +52,6 @@ import io.netty.channel.unix.Errors.NativeIoException;
 import io.netty.handler.codec.CorruptedFrameException;
 import io.netty.handler.codec.DecoderException;
 import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import io.netty.handler.codec.LengthFieldPrepender;
 import io.netty.handler.codec.TooLongFrameException;
 import io.netty.handler.flush.FlushConsolidationHandler;
 import io.netty.handler.ssl.SslHandler;
@@ -584,10 +583,9 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
             protected void initChannel(Channel ch) throws Exception {
                 ChannelPipeline pipeline = ch.pipeline();
                 pipeline.addLast("consolidation", new 
FlushConsolidationHandler(1024, true));
-                pipeline.addLast("bytebufList", ByteBufList.ENCODER_WITH_SIZE);
+                pipeline.addLast("bytebufList", ByteBufList.ENCODER);
                 pipeline.addLast("lengthbasedframedecoder",
                         new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 
0, 4));
-                pipeline.addLast("lengthprepender", new 
LengthFieldPrepender(4));
                 pipeline.addLast("bookieProtoEncoder", new 
BookieProtoEncoding.RequestEncoder(extRegistry));
                 pipeline.addLast(
                     "bookieProtoDecoder",
@@ -1273,7 +1271,7 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
throws Exception {
         exceptionCounter.inc();
         if (cause instanceof CorruptedFrameException || cause instanceof 
TooLongFrameException) {
-            LOG.error("Corrupted frame received from bookie: {}", 
ctx.channel().remoteAddress());
+            LOG.error("Corrupted frame received from bookie: {}", 
ctx.channel());
             ctx.close();
             return;
         }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
index d136ff0c58..f74652bb10 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
@@ -44,8 +44,6 @@ import java.util.ArrayList;
  * will need to be encoded on the channel. There are 2 utility encoders:
  * <ul>
  * <li>{@link #ENCODER}: regular encode that will write all the buffers in the 
{@link ByteBufList} on the channel</li>
- * <li>{@link #ENCODER_WITH_SIZE}: similar to the previous one, but also 
prepend a 4 bytes size header, once, carrying
- * the size of the readable bytes across all the buffers contained in the 
{@link ByteBufList}</li>
  * </ul>
  *
  * <p>Example:
@@ -307,13 +305,7 @@ public class ByteBufList extends AbstractReferenceCounted {
     /**
      * Encoder for the {@link ByteBufList} that doesn't prepend any size 
header.
      */
-    public static final Encoder ENCODER = new Encoder(false);
-
-    /**
-     * Encoder for the {@link ByteBufList} that will prepend a 4 byte header 
with the size of the whole
-     * {@link ByteBufList} readable bytes.
-     */
-    public static final Encoder ENCODER_WITH_SIZE = new Encoder(true);
+    public static final Encoder ENCODER = new Encoder();
 
     /**
      * {@link ByteBufList} encoder.
@@ -321,26 +313,12 @@ public class ByteBufList extends AbstractReferenceCounted 
{
     @Sharable
     public static class Encoder extends ChannelOutboundHandlerAdapter {
 
-        private final boolean prependSize;
-
-        public Encoder(boolean prependSize) {
-            this.prependSize = prependSize;
-        }
-
         @Override
         public void write(ChannelHandlerContext ctx, Object msg, 
ChannelPromise promise) throws Exception {
             if (msg instanceof ByteBufList) {
                 ByteBufList b = (ByteBufList) msg;
 
                 try {
-                    if (prependSize) {
-                        // Prepend the frame size before writing the buffer 
list, so that we only have 1 single size
-                        // header
-                        ByteBuf sizeBuffer = ctx.alloc().directBuffer(4, 4);
-                        sizeBuffer.writeInt(b.readableBytes());
-                        ctx.write(sizeBuffer, ctx.voidPromise());
-                    }
-
                     // Write each buffer individually on the socket. The 
retain() here is needed to preserve the fact
                     // that ByteBuf are automatically released after a write. 
If the ByteBufPair ref count is increased
                     // and it gets written multiple times, the individual 
buffers refcount should be reflected as well.
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java
index 537c9fcfc7..bba26f37a2 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java
@@ -102,9 +102,9 @@ public class BookieProtoEncodingTest {
         }
         assertEquals(0, outList.size());
 
-        v3Decoder.channelRead(
-            ctx,
-            v3Encoder.encode(v3Resp, UnpooledByteBufAllocator.DEFAULT));
+        ByteBuf serWithFrameSize = (ByteBuf) v3Encoder.encode(v3Resp, 
UnpooledByteBufAllocator.DEFAULT);
+        ByteBuf ser = serWithFrameSize.slice(4, 
serWithFrameSize.readableBytes() - 4);
+        v3Decoder.channelRead(ctx, ser);
         assertEquals(1, outList.size());
     }
 

Reply via email to