This is an automated email from the ASF dual-hosted git repository.
chenhang pushed a commit to branch branch-4.14
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/branch-4.14 by this push:
new 5037159d8d Avoid extra buffer to prepend frame size (#3560)
5037159d8d is described below
commit 5037159d8dbee3dca098565fdf12b2dd22eec51d
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Oct 31 21:05:42 2022 -0700
Avoid extra buffer to prepend frame size (#3560)
* Avoid extra buffer to prepend frame size
* Fixed checkstyle
* Fixed touch methods on ReadResponse
* Fixed frame size for protobuf requests
* Removed unwanted changes
* Fixed AuthResponse in v2 protocol
* Fixed test
(cherry picked from commit 02be9d2ee28cd7c4a1895fc8494db96381484a20)
---
.../apache/bookkeeper/proto/BookieNettyServer.java | 6 +---
.../bookkeeper/proto/BookieProtoEncoding.java | 42 ++++++++++++++++------
.../apache/bookkeeper/proto/BookieProtocol.java | 9 +++--
.../bookkeeper/proto/PerChannelBookieClient.java | 6 ++--
.../org/apache/bookkeeper/util/ByteBufList.java | 24 +------------
.../bookkeeper/proto/BookieProtoEncodingTest.java | 6 ++--
6 files changed, 44 insertions(+), 49 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
index 1255c098f5..e391c2452e 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieNettyServer.java
@@ -50,7 +50,6 @@ import io.netty.channel.local.LocalServerChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.flush.FlushConsolidationHandler;
import io.netty.handler.ssl.SslHandler;
import io.netty.util.concurrent.DefaultThreadFactory;
@@ -336,11 +335,9 @@ class BookieNettyServer {
pipeline.addLast("consolidation", new
FlushConsolidationHandler(1024, true));
- // For ByteBufList, skip the usual LengthFieldPrepender
and have the encoder itself to add it
- pipeline.addLast("bytebufList",
ByteBufList.ENCODER_WITH_SIZE);
+ pipeline.addLast("bytebufList", ByteBufList.ENCODER);
pipeline.addLast("lengthbaseddecoder", new
LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4));
- pipeline.addLast("lengthprepender", new
LengthFieldPrepender(4));
pipeline.addLast("bookieProtoDecoder", new
BookieProtoEncoding.RequestDecoder(registry));
pipeline.addLast("bookieProtoEncoder", new
BookieProtoEncoding.ResponseEncoder(registry));
@@ -401,7 +398,6 @@ class BookieNettyServer {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("lengthbaseddecoder", new
LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4, 0, 4));
- pipeline.addLast("lengthprepender", new
LengthFieldPrepender(4));
pipeline.addLast("bookieProtoDecoder", new
BookieProtoEncoding.RequestDecoder(registry));
pipeline.addLast("bookieProtoEncoder", new
BookieProtoEncoding.ResponseEncoder(registry));
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
index 4f7517bb61..88ee09d61f 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
@@ -109,12 +109,17 @@ public class BookieProtoEncoding {
BookieProtocol.Request r = (BookieProtocol.Request) msg;
if (r instanceof BookieProtocol.AddRequest) {
BookieProtocol.AddRequest ar = (BookieProtocol.AddRequest) r;
- int totalHeaderSize = 4 // for the header
- + BookieProtocol.MASTER_KEY_LENGTH; // for the master key
- ByteBuf buf = allocator.buffer(totalHeaderSize);
+ ByteBufList data = ar.getData();
+
+ int totalHeaderSize = 4 // for the request header
+ + BookieProtocol.MASTER_KEY_LENGTH; // for the master
key
+
+ int totalPayloadSize = totalHeaderSize + data.readableBytes();
+ ByteBuf buf = allocator.buffer(totalHeaderSize + 4 /* frame
size */);
+ buf.writeInt(totalPayloadSize); // Frame header
buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(),
r.getOpCode(), r.getFlags()));
buf.writeBytes(r.getMasterKey(), 0,
BookieProtocol.MASTER_KEY_LENGTH);
- ByteBufList data = ar.getData();
+
ar.recycle();
data.prepend(buf);
return data;
@@ -126,7 +131,8 @@ public class BookieProtoEncoding {
totalHeaderSize += BookieProtocol.MASTER_KEY_LENGTH;
}
- ByteBuf buf = allocator.buffer(totalHeaderSize);
+ ByteBuf buf = allocator.buffer(totalHeaderSize + 4 /* frame
size */);
+ buf.writeInt(totalHeaderSize);
buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(),
r.getOpCode(), r.getFlags()));
buf.writeLong(r.getLedgerId());
buf.writeLong(r.getEntryId());
@@ -139,7 +145,8 @@ public class BookieProtoEncoding {
BookkeeperProtocol.AuthMessage am =
((BookieProtocol.AuthRequest) r).getAuthMessage();
int totalHeaderSize = 4; // for request type
int totalSize = totalHeaderSize + am.getSerializedSize();
- ByteBuf buf = allocator.buffer(totalSize);
+ ByteBuf buf = allocator.buffer(totalSize + 4 /* frame size */);
+ buf.writeInt(totalSize);
buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(),
r.getOpCode(), r.getFlags()));
ByteBufOutputStream bufStream = new ByteBufOutputStream(buf);
am.writeTo(bufStream);
@@ -233,6 +240,8 @@ public class BookieProtoEncoding {
this.extensionRegistry = extensionRegistry;
}
+ private static final int RESPONSE_HEADERS_SIZE = 24;
+
@Override
public Object encode(Object msg, ByteBufAllocator allocator)
throws Exception {
@@ -240,7 +249,8 @@ public class BookieProtoEncoding {
return msg;
}
BookieProtocol.Response r = (BookieProtocol.Response) msg;
- ByteBuf buf = allocator.buffer(24);
+ ByteBuf buf = allocator.buffer(RESPONSE_HEADERS_SIZE + 4 /* frame
size */);
+ buf.writerIndex(4); // Leave the placeholder for the frame size
buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(),
r.getOpCode(), (short) 0));
try {
@@ -251,19 +261,26 @@ public class BookieProtoEncoding {
BookieProtocol.ReadResponse rr =
(BookieProtocol.ReadResponse) r;
if (rr.hasData()) {
+ int frameSize = RESPONSE_HEADERS_SIZE +
rr.getData().readableBytes();
+ buf.setInt(0, frameSize);
return ByteBufList.get(buf, rr.getData());
} else {
+ buf.setInt(0, RESPONSE_HEADERS_SIZE); // Frame size
return buf;
}
} else if (msg instanceof BookieProtocol.AddResponse) {
buf.writeInt(r.getErrorCode());
buf.writeLong(r.getLedgerId());
buf.writeLong(r.getEntryId());
+ buf.setInt(0, RESPONSE_HEADERS_SIZE); // Frame size
return buf;
} else if (msg instanceof BookieProtocol.AuthResponse) {
BookkeeperProtocol.AuthMessage am =
((BookieProtocol.AuthResponse) r).getAuthMessage();
- return ByteBufList.get(buf,
Unpooled.wrappedBuffer(am.toByteArray()));
+ ByteBuf payload = Unpooled.wrappedBuffer(am.toByteArray());
+ int frameSize = 4 + payload.readableBytes();
+ buf.setInt(0, frameSize);
+ return ByteBufList.get(buf, payload);
} else {
LOG.error("Cannot encode unknown response type {}",
msg.getClass().getName());
return msg;
@@ -356,6 +373,8 @@ public class BookieProtoEncoding {
private static ByteBuf serializeProtobuf(MessageLite msg, ByteBufAllocator
allocator) {
int size = msg.getSerializedSize();
+ int frameSize = size + 4;
+
// Protobuf serialization is the last step of the netty pipeline. We
used to allocate
// a heap buffer while serializing and pass it down to netty library.
// In AbstractChannel#filterOutboundMessage(), netty copies that data
to a direct buffer if
@@ -363,17 +382,18 @@ public class BookieProtoEncoding {
// Allocating a direct buffer reducing unncessary CPU cycles for
buffer copies in BK client
// and also helps alleviate pressure off the GC, since there is less
memory churn.
// Bookies aren't usually CPU bound. This change improves READ_ENTRY
code paths by a small factor as well.
- ByteBuf buf = allocator.directBuffer(size, size);
+ ByteBuf buf = allocator.directBuffer(frameSize, frameSize);
+ buf.writeInt(size);
try {
-
msg.writeTo(CodedOutputStream.newInstance(buf.nioBuffer(buf.readerIndex(),
size)));
+
msg.writeTo(CodedOutputStream.newInstance(buf.nioBuffer(buf.writerIndex(),
size)));
} catch (IOException e) {
// This is in-memory serialization, should not fail
throw new RuntimeException(e);
}
// Advance writer idx
- buf.writerIndex(buf.capacity());
+ buf.writerIndex(frameSize);
return buf;
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
index 8378b44bf7..7fafeba59d 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
@@ -466,7 +466,8 @@ public interface BookieProtocol {
@Override
public ReferenceCounted retain() {
- return data.retain();
+ data.retain();
+ return this;
}
@Override
@@ -476,12 +477,14 @@ public interface BookieProtocol {
@Override
public ReferenceCounted touch() {
- return data.touch();
+ data.touch();
+ return this;
}
@Override
public ReferenceCounted touch(Object hint) {
- return data.touch(hint);
+ data.touch(hint);
+ return this;
}
@Override
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index afc4934289..01f8a8154f 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -51,7 +51,6 @@ import io.netty.channel.unix.Errors.NativeIoException;
import io.netty.handler.codec.CorruptedFrameException;
import io.netty.handler.codec.DecoderException;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
-import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.TooLongFrameException;
import io.netty.handler.flush.FlushConsolidationHandler;
import io.netty.handler.ssl.SslHandler;
@@ -576,10 +575,9 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("consolidation", new
FlushConsolidationHandler(1024, true));
- pipeline.addLast("bytebufList", ByteBufList.ENCODER_WITH_SIZE);
+ pipeline.addLast("bytebufList", ByteBufList.ENCODER);
pipeline.addLast("lengthbasedframedecoder",
new LengthFieldBasedFrameDecoder(maxFrameSize, 0, 4,
0, 4));
- pipeline.addLast("lengthprepender", new
LengthFieldPrepender(4));
pipeline.addLast("bookieProtoEncoder", new
BookieProtoEncoding.RequestEncoder(extRegistry));
pipeline.addLast(
"bookieProtoDecoder",
@@ -1265,7 +1263,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
exceptionCounter.inc();
if (cause instanceof CorruptedFrameException || cause instanceof
TooLongFrameException) {
- LOG.error("Corrupted frame received from bookie: {}",
ctx.channel().remoteAddress());
+ LOG.error("Corrupted frame received from bookie: {}",
ctx.channel());
ctx.close();
return;
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
index d136ff0c58..f74652bb10 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/ByteBufList.java
@@ -44,8 +44,6 @@ import java.util.ArrayList;
* will need to be encoded on the channel. There are 2 utility encoders:
* <ul>
* <li>{@link #ENCODER}: regular encode that will write all the buffers in the
{@link ByteBufList} on the channel</li>
- * <li>{@link #ENCODER_WITH_SIZE}: similar to the previous one, but also
prepend a 4 bytes size header, once, carrying
- * the size of the readable bytes across all the buffers contained in the
{@link ByteBufList}</li>
* </ul>
*
* <p>Example:
@@ -307,13 +305,7 @@ public class ByteBufList extends AbstractReferenceCounted {
/**
* Encoder for the {@link ByteBufList} that doesn't prepend any size
header.
*/
- public static final Encoder ENCODER = new Encoder(false);
-
- /**
- * Encoder for the {@link ByteBufList} that will prepend a 4 byte header
with the size of the whole
- * {@link ByteBufList} readable bytes.
- */
- public static final Encoder ENCODER_WITH_SIZE = new Encoder(true);
+ public static final Encoder ENCODER = new Encoder();
/**
* {@link ByteBufList} encoder.
@@ -321,26 +313,12 @@ public class ByteBufList extends AbstractReferenceCounted
{
@Sharable
public static class Encoder extends ChannelOutboundHandlerAdapter {
- private final boolean prependSize;
-
- public Encoder(boolean prependSize) {
- this.prependSize = prependSize;
- }
-
@Override
public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) throws Exception {
if (msg instanceof ByteBufList) {
ByteBufList b = (ByteBufList) msg;
try {
- if (prependSize) {
- // Prepend the frame size before writing the buffer
list, so that we only have 1 single size
- // header
- ByteBuf sizeBuffer = ctx.alloc().directBuffer(4, 4);
- sizeBuffer.writeInt(b.readableBytes());
- ctx.write(sizeBuffer, ctx.voidPromise());
- }
-
// Write each buffer individually on the socket. The
retain() here is needed to preserve the fact
// that ByteBuf are automatically released after a write.
If the ByteBufPair ref count is increased
// and it gets written multiple times, the individual
buffers refcount should be reflected as well.
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java
index 537c9fcfc7..bba26f37a2 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java
@@ -102,9 +102,9 @@ public class BookieProtoEncodingTest {
}
assertEquals(0, outList.size());
- v3Decoder.channelRead(
- ctx,
- v3Encoder.encode(v3Resp, UnpooledByteBufAllocator.DEFAULT));
+ ByteBuf serWithFrameSize = (ByteBuf) v3Encoder.encode(v3Resp,
UnpooledByteBufAllocator.DEFAULT);
+ ByteBuf ser = serWithFrameSize.slice(4,
serWithFrameSize.readableBytes() - 4);
+ v3Decoder.channelRead(ctx, ser);
assertEquals(1, outList.size());
}