This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 396ec12dc7 Optimize ReadResponse for small entry sizes (#3597)
396ec12dc7 is described below
commit 396ec12dc747fc21105d65f13d2dedace247d1a2
Author: Matteo Merli <[email protected]>
AuthorDate: Wed Nov 2 10:42:51 2022 -0700
Optimize ReadResponse for small entry sizes (#3597)
* Optimize ReadResponse for small entry sizes
* Fixed checkstyle
---
.../bookkeeper/proto/BookieProtoEncoding.java | 49 ++++++++++++++--------
1 file changed, 32 insertions(+), 17 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
index 70962dfd63..d804a41b0e 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
@@ -28,7 +28,6 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
-import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
@@ -51,6 +50,13 @@ import org.slf4j.LoggerFactory;
public class BookieProtoEncoding {
private static final Logger LOG =
LoggerFactory.getLogger(BookieProtoEncoding.class);
+ /**
+ * Threshold under which an entry is considered to be "small".
+ *
+ * Small entries payloads are copied instead of being passed around as
references.
+ */
+ public static final int SMALL_ENTRY_SIZE_THRESHOLD = 16 * 1024;
+
/**
* An encoder/decoder interface for the Bookkeeper protocol.
*/
@@ -246,38 +252,47 @@ public class BookieProtoEncoding {
return msg;
}
BookieProtocol.Response r = (BookieProtocol.Response) msg;
- ByteBuf buf = allocator.buffer(RESPONSE_HEADERS_SIZE + 4 /* frame
size */);
- buf.writerIndex(4); // Leave the placeholder for the frame size
- buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(),
r.getOpCode(), (short) 0));
try {
if (msg instanceof BookieProtocol.ReadResponse) {
+ BookieProtocol.ReadResponse rr =
(BookieProtocol.ReadResponse) r;
+ int payloadSize = rr.getData().readableBytes();
+ int responseSize = RESPONSE_HEADERS_SIZE + payloadSize;
+ boolean isSmallEntry = payloadSize <
SMALL_ENTRY_SIZE_THRESHOLD;
+
+ int bufferSize = 4 /* frame size */ + RESPONSE_HEADERS_SIZE
+ + (isSmallEntry ? payloadSize : 0);
+ ByteBuf buf = allocator.buffer(bufferSize);
+ buf.writeInt(responseSize);
+ buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(),
r.getOpCode(), (short) 0));
buf.writeInt(r.getErrorCode());
buf.writeLong(r.getLedgerId());
buf.writeLong(r.getEntryId());
- BookieProtocol.ReadResponse rr =
(BookieProtocol.ReadResponse) r;
- if (rr.hasData()) {
- int frameSize = RESPONSE_HEADERS_SIZE +
rr.getData().readableBytes();
- buf.setInt(0, frameSize);
- return ByteBufList.get(buf, rr.getData());
- } else {
- buf.setInt(0, RESPONSE_HEADERS_SIZE); // Frame size
+ if (isSmallEntry) {
+ buf.writeBytes(rr.getData());
return buf;
+ } else {
+ return ByteBufList.get(buf, rr.getData());
}
} else if (msg instanceof BookieProtocol.AddResponse) {
+ ByteBuf buf = allocator.buffer(RESPONSE_HEADERS_SIZE + 4
/* frame size */);
+ buf.writeInt(RESPONSE_HEADERS_SIZE);
+ buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(),
r.getOpCode(), (short) 0));
buf.writeInt(r.getErrorCode());
buf.writeLong(r.getLedgerId());
buf.writeLong(r.getEntryId());
- buf.setInt(0, RESPONSE_HEADERS_SIZE); // Frame size
-
return buf;
} else if (msg instanceof BookieProtocol.AuthResponse) {
BookkeeperProtocol.AuthMessage am =
((BookieProtocol.AuthResponse) r).getAuthMessage();
- ByteBuf payload = Unpooled.wrappedBuffer(am.toByteArray());
- int frameSize = 4 + payload.readableBytes();
- buf.setInt(0, frameSize);
- return ByteBufList.get(buf, payload);
+ int payloadSize = 4 + am.getSerializedSize();
+ int bufferSize = payloadSize + 4 /* frame size */;
+
+ ByteBuf buf = allocator.buffer(bufferSize);
+ buf.writeInt(payloadSize);
+ buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(),
r.getOpCode(), (short) 0));
+ buf.writeBytes(am.toByteArray());
+ return buf;
} else {
LOG.error("Cannot encode unknown response type {}",
msg.getClass().getName());
return msg;