This is an automated email from the ASF dual-hosted git repository. yong pushed a commit to branch branch-4.15 in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
commit ed55d4184ee949e2a56459ac4aacf05410d3ac43 Author: Matteo Merli <[email protected]> AuthorDate: Mon Oct 31 21:05:42 2022 -0700 Avoid extra buffer to prepend frame size (#3560) * Avoid extra buffer to prepend frame size * Fixed checkstyle * Fixed touch methods on ReadResponse * Fixed frame size for protobuf requests * Removed unwanted changes * Fixed AuthResponse in v2 protocol * Fixed test (cherry picked from commit 02be9d2ee28cd7c4a1895fc8494db96381484a20) --- .../apache/bookkeeper/proto/BookieNettyServer.java | 6 +--- .../bookkeeper/proto/BookieProtoEncoding.java | 42 ++++++++++++++++------ .../apache/bookkeeper/proto/BookieProtocol.java | 9 +++-- .../bookkeeper/proto/PerChannelBookieClient.java | 6 ++-- .../org/apache/bookkeeper/util/ByteBufList.java | 24 +------------ .../bookkeeper/proto/BookieProtoEncodingTest.java | 6 ++-- 6 files changed, 44 insertions(+), 49 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java index b301bd91ba..7b70a12f3a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java @@ -49,7 +49,6 @@ import io.netty.channel.local.LocalServerChannel; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; -import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.flush.FlushConsolidationHandler; import io.netty.handler.ssl.SslHandler; import io.netty.util.concurrent.DefaultThreadFactory; @@ -332,11 +331,9 @@ class BookieNettyServer { pipeline.addLast("consolidation", new FlushConsolidationHandler(1024, true)); - // For ByteBufList, skip the usual LengthFieldPrepender and have the encoder itself to add it - pipeline.addLast("bytebufList", ByteBufList.ENCODER_WITH_SIZE); + pipeline.addLast("bytebufList", ByteBufList.ENCODER); pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4)); - pipeline.addLast("lengthprepender", new LengthFieldPrepender(4)); pipeline.addLast("bookieProtoDecoder", new BookieProtoEncoding.RequestDecoder(registry)); pipeline.addLast("bookieProtoEncoder", new BookieProtoEncoding.ResponseEncoder(registry)); @@ -401,7 +398,6 @@ class BookieNettyServer { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("lengthbaseddecoder", new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4)); - pipeline.addLast("lengthprepender", new LengthFieldPrepender(4)); pipeline.addLast("bookieProtoDecoder", new BookieProtoEncoding.RequestDecoder(registry)); pipeline.addLast("bookieProtoEncoder", new BookieProtoEncoding.ResponseEncoder(registry)); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java index 50fdd2f81c..860fb6635f 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java @@ -109,12 +109,17 @@ public class BookieProtoEncoding { BookieProtocol.Request r = (BookieProtocol.Request) msg; if (r instanceof BookieProtocol.AddRequest) { BookieProtocol.AddRequest ar = (BookieProtocol.AddRequest) r; - int totalHeaderSize = 4 // for the header - + BookieProtocol.MASTER_KEY_LENGTH; // for the master key - ByteBuf buf = allocator.buffer(totalHeaderSize); + ByteBufList data = ar.getData(); + + int totalHeaderSize = 4 // for the request header + + BookieProtocol.MASTER_KEY_LENGTH; // for the master key + + int totalPayloadSize = totalHeaderSize + data.readableBytes(); + ByteBuf buf = allocator.buffer(totalHeaderSize + 4 /* frame size */); + buf.writeInt(totalPayloadSize); // Frame header buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(), r.getOpCode(), r.getFlags())); buf.writeBytes(r.getMasterKey(), 0, BookieProtocol.MASTER_KEY_LENGTH); - ByteBufList data = ar.getData(); + ar.recycle(); data.prepend(buf); return data; @@ -126,7 +131,8 @@ public class BookieProtoEncoding { totalHeaderSize += BookieProtocol.MASTER_KEY_LENGTH; } - ByteBuf buf = allocator.buffer(totalHeaderSize); + ByteBuf buf = allocator.buffer(totalHeaderSize + 4 /* frame size */); + buf.writeInt(totalHeaderSize); buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(), r.getOpCode(), r.getFlags())); buf.writeLong(r.getLedgerId()); buf.writeLong(r.getEntryId()); @@ -139,7 +145,8 @@ public class BookieProtoEncoding { BookkeeperProtocol.AuthMessage am = ((BookieProtocol.AuthRequest) r).getAuthMessage(); int totalHeaderSize = 4; // for request type int totalSize = totalHeaderSize + am.getSerializedSize(); - ByteBuf buf = allocator.buffer(totalSize); + ByteBuf buf = allocator.buffer(totalSize + 4 /* frame size */); + buf.writeInt(totalSize); buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(), r.getOpCode(), r.getFlags())); ByteBufOutputStream bufStream = new ByteBufOutputStream(buf); am.writeTo(bufStream); @@ -233,6 +240,8 @@ public class BookieProtoEncoding { this.extensionRegistry = extensionRegistry; } + private static final int RESPONSE_HEADERS_SIZE = 24; + @Override public Object encode(Object msg, ByteBufAllocator allocator) throws Exception { @@ -240,7 +249,8 @@ public class BookieProtoEncoding { return msg; } BookieProtocol.Response r = (BookieProtocol.Response) msg; - ByteBuf buf = allocator.buffer(24); + ByteBuf buf = allocator.buffer(RESPONSE_HEADERS_SIZE + 4 /* frame size */); + buf.writerIndex(4); // Leave the placeholder for the frame size buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(), r.getOpCode(), (short) 0)); try { @@ -251,19 +261,26 @@ public class BookieProtoEncoding { BookieProtocol.ReadResponse rr = (BookieProtocol.ReadResponse) r; if (rr.hasData()) { + int frameSize = RESPONSE_HEADERS_SIZE + rr.getData().readableBytes(); + buf.setInt(0, frameSize); return ByteBufList.get(buf, rr.getData()); } else { + buf.setInt(0, RESPONSE_HEADERS_SIZE); // Frame size return buf; } } else if (msg instanceof BookieProtocol.AddResponse) { buf.writeInt(r.getErrorCode()); buf.writeLong(r.getLedgerId()); buf.writeLong(r.getEntryId()); + buf.setInt(0, RESPONSE_HEADERS_SIZE); // Frame size return buf; } else if (msg instanceof BookieProtocol.AuthResponse) { BookkeeperProtocol.AuthMessage am = ((BookieProtocol.AuthResponse) r).getAuthMessage(); - return ByteBufList.get(buf, Unpooled.wrappedBuffer(am.toByteArray())); + ByteBuf payload = Unpooled.wrappedBuffer(am.toByteArray()); + int frameSize = 4 + payload.readableBytes(); + buf.setInt(0, frameSize); + return ByteBufList.get(buf, payload); } else { LOG.error("Cannot encode unknown response type {}", msg.getClass().getName()); return msg; @@ -356,6 +373,8 @@ public class BookieProtoEncoding { private static ByteBuf serializeProtobuf(MessageLite msg, ByteBufAllocator allocator) { int size = msg.getSerializedSize(); + int frameSize = size + 4; + // Protobuf serialization is the last step of the netty pipeline. We used to allocate // a heap buffer while serializing and pass it down to netty library. // In AbstractChannel#filterOutboundMessage(), netty copies that data to a direct buffer if @@ -363,17 +382,18 @@ public class BookieProtoEncoding { // Allocating a direct buffer reducing unncessary CPU cycles for buffer copies in BK client // and also helps alleviate pressure off the GC, since there is less memory churn. // Bookies aren't usually CPU bound. This change improves READ_ENTRY code paths by a small factor as well. - ByteBuf buf = allocator.directBuffer(size, size); + ByteBuf buf = allocator.directBuffer(frameSize, frameSize); + buf.writeInt(size); try { - msg.writeTo(CodedOutputStream.newInstance(buf.nioBuffer(buf.readerIndex(), size))); + msg.writeTo(CodedOutputStream.newInstance(buf.nioBuffer(buf.writerIndex(), size))); } catch (IOException e) { // This is in-memory serialization, should not fail throw new RuntimeException(e); } // Advance writer idx - buf.writerIndex(buf.capacity()); + buf.writerIndex(frameSize); return buf; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java index 965f3fd1bb..c3b09b856d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java @@ -471,7 +471,8 @@ public interface BookieProtocol { @Override public ReferenceCounted retain() { - return data.retain(); + data.retain(); + return this; } @Override @@ -481,12 +482,14 @@ public interface BookieProtocol { @Override public ReferenceCounted touch() { - return data.touch(); + data.touch(); + return this; } @Override public ReferenceCounted touch(Object hint) { - return data.touch(hint); + data.touch(hint); + return this; } @Override diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java index 1b1092bd39..2cda0d243b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java @@ -52,7 +52,6 @@ import io.netty.channel.unix.Errors.NativeIoException; import io.netty.handler.codec.CorruptedFrameException; import io.netty.handler.codec.DecoderException; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; -import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.TooLongFrameException; import io.netty.handler.flush.FlushConsolidationHandler; import io.netty.handler.ssl.SslHandler; @@ -584,10 +583,9 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("consolidation", new FlushConsolidationHandler(1024, true)); - pipeline.addLast("bytebufList", ByteBufList.ENCODER_WITH_SIZE); + pipeline.addLast("bytebufList", ByteBufList.ENCODER); pipeline.addLast("lengthbasedframedecoder", new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4)); - pipeline.addLast("lengthprepender", new LengthFieldPrepender(4)); pipeline.addLast("bookieProtoEncoder", new BookieProtoEncoding.RequestEncoder(extRegistry)); pipeline.addLast( "bookieProtoDecoder", @@ -1273,7 +1271,7 @@ public class PerChannelBookieClient extends ChannelInboundHandlerAdapter { public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { exceptionCounter.inc(); if (cause instanceof CorruptedFrameException || cause instanceof TooLongFrameException) { - LOG.error("Corrupted frame received from bookie: {}", ctx.channel().remoteAddress()); + LOG.error("Corrupted frame received from bookie: {}", ctx.channel()); ctx.close(); return; } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java index d136ff0c58..f74652bb10 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java @@ -44,8 +44,6 @@ import java.util.ArrayList; * will need to be encoded on the channel. There are 2 utility encoders: * <ul> * <li>{@link #ENCODER}: regular encode that will write all the buffers in the {@link ByteBufList} on the channel</li> - * <li>{@link #ENCODER_WITH_SIZE}: similar to the previous one, but also prepend a 4 bytes size header, once, carrying - * the size of the readable bytes across all the buffers contained in the {@link ByteBufList}</li> * </ul> * * <p>Example: @@ -307,13 +305,7 @@ public class ByteBufList extends AbstractReferenceCounted { /** * Encoder for the {@link ByteBufList} that doesn't prepend any size header. */ - public static final Encoder ENCODER = new Encoder(false); - - /** - * Encoder for the {@link ByteBufList} that will prepend a 4 byte header with the size of the whole - * {@link ByteBufList} readable bytes. - */ - public static final Encoder ENCODER_WITH_SIZE = new Encoder(true); + public static final Encoder ENCODER = new Encoder(); /** * {@link ByteBufList} encoder. @@ -321,26 +313,12 @@ public class ByteBufList extends AbstractReferenceCounted { @Sharable public static class Encoder extends ChannelOutboundHandlerAdapter { - private final boolean prependSize; - - public Encoder(boolean prependSize) { - this.prependSize = prependSize; - } - @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { if (msg instanceof ByteBufList) { ByteBufList b = (ByteBufList) msg; try { - if (prependSize) { - // Prepend the frame size before writing the buffer list, so that we only have 1 single size - // header - ByteBuf sizeBuffer = ctx.alloc().directBuffer(4, 4); - sizeBuffer.writeInt(b.readableBytes()); - ctx.write(sizeBuffer, ctx.voidPromise()); - } - // Write each buffer individually on the socket. The retain() here is needed to preserve the fact // that ByteBuf are automatically released after a write. If the ByteBufPair ref count is increased // and it gets written multiple times, the individual buffers refcount should be reflected as well. diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java index 537c9fcfc7..bba26f37a2 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java @@ -102,9 +102,9 @@ public class BookieProtoEncodingTest { } assertEquals(0, outList.size()); - v3Decoder.channelRead( - ctx, - v3Encoder.encode(v3Resp, UnpooledByteBufAllocator.DEFAULT)); + ByteBuf serWithFrameSize = (ByteBuf) v3Encoder.encode(v3Resp, UnpooledByteBufAllocator.DEFAULT); + ByteBuf ser = serWithFrameSize.slice(4, serWithFrameSize.readableBytes() - 4); + v3Decoder.channelRead(ctx, ser); assertEquals(1, outList.size()); }
