This is an automated email from the ASF dual-hosted git repository.
chenhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 92e41c1493 [BP-62] Batch read protocol change: Support encoding and
decoding for batchReadRequest and batchReadResponse (#4177)
92e41c1493 is described below
commit 92e41c14934cf2bd8eef470ad35f0f9660886737
Author: Yan Zhao <[email protected]>
AuthorDate: Tue Jan 23 10:15:06 2024 +0800
[BP-62] Batch read protocol change: Support encoding and decoding for
batchReadRequest and batchReadResponse (#4177)
### Motivation
This is the first PR for the batch read(#4051) feature.
Protocol change: Support encoding and decoding for batchReadRequest and
batchReadResponse
---
.../bookkeeper/proto/BookieProtoEncoding.java | 93 +++++++++++++-
.../apache/bookkeeper/proto/BookieProtocol.java | 139 ++++++++++++++++++++-
.../bookkeeper/proto/BookieProtoEncodingTest.java | 34 +++++
3 files changed, 264 insertions(+), 2 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
index c56235dbe6..a93b63f1f6 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
@@ -110,7 +110,30 @@ public class BookieProtoEncoding {
return msg;
}
BookieProtocol.Request r = (BookieProtocol.Request) msg;
- if (r instanceof BookieProtocol.ReadRequest) {
+ if (r instanceof BookieProtocol.BatchedReadRequest) {
+ int totalHeaderSize = 4 // for request type
+ + 8 // for ledger id
+ + 8 // for entry id
+ + 8 // for request id
+ + 4 // for max count
+ + 8; // for max size
+ if (r.hasMasterKey()) {
+ totalHeaderSize += BookieProtocol.MASTER_KEY_LENGTH;
+ }
+ ByteBuf buf = allocator.buffer(totalHeaderSize + 4 /* frame
size */);
+ buf.writeInt(totalHeaderSize);
+ buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(),
r.getOpCode(), r.getFlags()));
+ buf.writeLong(r.getLedgerId());
+ buf.writeLong(r.getEntryId());
+ buf.writeLong(((BookieProtocol.BatchedReadRequest)
r).getRequestId());
+ buf.writeInt(((BookieProtocol.BatchedReadRequest)
r).getMaxCount());
+ buf.writeLong(((BookieProtocol.BatchedReadRequest)
r).getMaxSize());
+ if (r.hasMasterKey()) {
+ buf.writeBytes(r.getMasterKey(), 0,
BookieProtocol.MASTER_KEY_LENGTH);
+ }
+ r.recycle();
+ return buf;
+ } else if (r instanceof BookieProtocol.ReadRequest) {
int totalHeaderSize = 4 // for request type
+ 8 // for ledgerId
+ 8; // for entryId
@@ -181,6 +204,21 @@ public class BookieProtoEncoding {
} else {
return BookieProtocol.ReadRequest.create(version,
ledgerId, entryId, flags, null);
}
+ case BookieProtocol.BATCH_READ_ENTRY:
+ ledgerId = packet.readLong();
+ entryId = packet.readLong();
+ long requestId = packet.readLong();
+ int maxCount = packet.readInt();
+ long maxSize = packet.readLong();
+ if ((flags & BookieProtocol.FLAG_DO_FENCING) ==
BookieProtocol.FLAG_DO_FENCING
+ && version >= 2) {
+ byte[] masterKey = readMasterKey(packet);
+ return BookieProtocol.BatchedReadRequest.create(version,
ledgerId, entryId, flags, masterKey,
+ requestId, maxCount, maxSize);
+ } else {
+ return BookieProtocol.BatchedReadRequest.create(version,
ledgerId, entryId, flags, null,
+ requestId, maxCount, maxSize);
+ }
case BookieProtocol.AUTH:
BookkeeperProtocol.AuthMessage.Builder builder =
BookkeeperProtocol.AuthMessage.newBuilder();
builder.mergeFrom(new ByteBufInputStream(packet),
extensionRegistry);
@@ -260,6 +298,40 @@ public class BookieProtoEncoding {
} else {
return ByteBufList.get(buf, rr.getData());
}
+ } else if (msg instanceof BookieProtocol.BatchedReadResponse) {
+ BookieProtocol.BatchedReadResponse brr =
(BookieProtocol.BatchedReadResponse) r;
+ int payloadSize = brr.getData().readableBytes();
+ int delimiterSize = brr.getData().size() * 4; // The size
of each entry.
+ boolean isSmallEntry = (payloadSize + delimiterSize) <
SMALL_ENTRY_SIZE_THRESHOLD;
+
+ int responseSize = RESPONSE_HEADERS_SIZE + 8 /* request_id
*/ + payloadSize + delimiterSize;
+ int bufferSize = 4 /* frame size */ + responseSize;
+ ByteBuf buf = allocator.buffer(bufferSize);
+ buf.writeInt(responseSize);
+ buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(),
r.getOpCode(), (short) 0));
+ buf.writeInt(r.getErrorCode());
+ buf.writeLong(r.getLedgerId());
+ buf.writeLong(r.getEntryId());
+ buf.writeLong(((BookieProtocol.BatchedReadResponse)
r).getRequestId());
+ if (isSmallEntry) {
+ for (int i = 0; i < brr.getData().size(); i++) {
+ ByteBuf entryData = brr.getData().getBuffer(i);
+ buf.writeInt(entryData.readableBytes());
+ buf.writeBytes(entryData);
+ }
+ brr.release();
+ return buf;
+ } else {
+ ByteBufList byteBufList = ByteBufList.get(buf);
+ for (int i = 0; i < brr.getData().size(); i++) {
+ ByteBuf entryData = brr.getData().getBuffer(i);
+ ByteBuf entryLengthBuf = allocator.buffer(4);
+ entryLengthBuf.writeInt(entryData.readableBytes());
+ byteBufList.add(entryLengthBuf);
+ byteBufList.add(entryData);
+ }
+ return byteBufList;
+ }
} else if (msg instanceof BookieProtocol.AddResponse) {
ByteBuf buf = allocator.buffer(RESPONSE_HEADERS_SIZE + 4
/* frame size */);
buf.writeInt(RESPONSE_HEADERS_SIZE);
@@ -309,6 +381,25 @@ public class BookieProtoEncoding {
return new BookieProtocol.ReadResponse(
version, rc, ledgerId, entryId,
buffer.retainedSlice());
+ case BookieProtocol.BATCH_READ_ENTRY:
+ rc = buffer.readInt();
+ ledgerId = buffer.readLong();
+ entryId = buffer.readLong();
+ long requestId = buffer.readLong();
+ ByteBufList data = null;
+ while (buffer.readableBytes() > 0) {
+ int entrySize = buffer.readInt();
+ int entryPos = buffer.readerIndex();
+ if (data == null) {
+ data = ByteBufList.get(buffer.retainedSlice(entryPos,
entrySize));
+ buffer.readerIndex(entryPos + entrySize);
+ } else {
+ data.add(buffer.retainedSlice(entryPos, entrySize));
+ buffer.readerIndex(entryPos + entrySize);
+ }
+ }
+ return new BookieProtocol.BatchedReadResponse(version, rc,
ledgerId, entryId, requestId, data == null
+ ? null : data.retain());
case BookieProtocol.AUTH:
ByteBufInputStream bufStream = new ByteBufInputStream(buffer);
BookkeeperProtocol.AuthMessage.Builder builder =
BookkeeperProtocol.AuthMessage.newBuilder();
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
index 3a27f08a95..68b8d61988 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
@@ -27,6 +27,7 @@ import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
+import org.apache.bookkeeper.util.ByteBufList;
/**
* The packets of the Bookie protocol all have a 4-byte integer indicating the
@@ -132,6 +133,7 @@ public interface BookieProtocol {
byte READ_LAC = 4;
byte WRITE_LAC = 5;
byte GET_BOOKIE_INFO = 6;
+ byte BATCH_READ_ENTRY = 7;
/**
* The error code that indicates success.
@@ -328,6 +330,10 @@ public interface BookieProtocol {
private final Handle<ReadRequest> recyclerHandle;
+ private ReadRequest() {
+ recyclerHandle = null;
+ }
+
private ReadRequest(Handle<ReadRequest> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}
@@ -344,7 +350,74 @@ public interface BookieProtocol {
ledgerId = -1;
entryId = -1;
masterKey = null;
- recyclerHandle.recycle(this);
+ if (recyclerHandle != null) {
+ recyclerHandle.recycle(this);
+ }
+ }
+ }
+
+ /**
+ * The request for reading data with batch optimization.
+ * The ledger_id and entry_id will be used as start_ledger_id and
start_entry_id.
+ * And the batch read operation can only happen on one ledger.
+ */
+ class BatchedReadRequest extends ReadRequest {
+
+ long requestId;
+ int maxCount;
+ long maxSize;
+
+ static BatchedReadRequest create(byte protocolVersion, long ledgerId,
long entryId,
+ short flags, byte[] masterKey, long requestId, int maxCount,
long maxSize) {
+ BatchedReadRequest request = RECYCLER.get();
+ request.protocolVersion = protocolVersion;
+ request.ledgerId = ledgerId;
+ request.entryId = entryId;
+ request.flags = flags;
+ request.masterKey = masterKey;
+ request.requestId = requestId;
+ request.maxCount = maxCount;
+ request.maxSize = maxSize;
+ request.opCode = BATCH_READ_ENTRY;
+ return request;
+ }
+
+ int getMaxCount() {
+ return maxCount;
+ }
+
+ long getMaxSize() {
+ return maxSize;
+ }
+
+ long getRequestId() {
+ return requestId;
+ }
+
+ private final Handle<BatchedReadRequest> recyclerHandle;
+
+ private BatchedReadRequest(Handle<BatchedReadRequest> recyclerHandle) {
+ this.recyclerHandle = recyclerHandle;
+ }
+
+ private static final Recycler<BatchedReadRequest> RECYCLER = new
Recycler<BatchedReadRequest>() {
+ @Override
+ protected BatchedReadRequest newObject(Handle<BatchedReadRequest>
handle) {
+ return new BatchedReadRequest(handle);
+ }
+ };
+
+ @Override
+ public void recycle() {
+ ledgerId = -1;
+ entryId = -1;
+ masterKey = null;
+ maxCount = -1;
+ maxSize = -1;
+ requestId = -1;
+ if (recyclerHandle != null) {
+ recyclerHandle.recycle(this);
+ }
}
}
@@ -479,6 +552,70 @@ public interface BookieProtocol {
}
}
+ /**
+ * The response for batched read.
+ * The ledger_id and entry_id will be used as start_ledger_id and
start_entry_id.
+ * And all the returned data is from one ledger.
+ */
+ class BatchedReadResponse extends Response implements ReferenceCounted {
+
+ final long requestId;
+ final ByteBufList data;
+
+ BatchedReadResponse(byte protocolVersion, int errorCode, long
ledgerId, long entryId, long requestId,
+ ByteBufList data) {
+ init(protocolVersion, BATCH_READ_ENTRY, errorCode, ledgerId,
entryId);
+ this.requestId = requestId;
+ this.data = data;
+ }
+
+ ByteBufList getData() {
+ return data;
+ }
+
+ long getRequestId() {
+ return requestId;
+ }
+
+ @Override
+ public int refCnt() {
+ return data.refCnt();
+ }
+
+ @Override
+ public ReferenceCounted retain() {
+ data.retain();
+ return this;
+ }
+
+ @Override
+ public ReferenceCounted retain(int increment) {
+ return data.retain(increment);
+ }
+
+ @Override
+ public ReferenceCounted touch() {
+ data.touch();
+ return this;
+ }
+
+ @Override
+ public ReferenceCounted touch(Object hint) {
+ data.touch(hint);
+ return this;
+ }
+
+ @Override
+ public boolean release() {
+ return data.release();
+ }
+
+ @Override
+ public boolean release(int decrement) {
+ return data.release(decrement);
+ }
+ }
+
/**
* A response that adds data.
*/
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java
index bba26f37a2..4f719ddfc0 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java
@@ -19,6 +19,7 @@
package org.apache.bookkeeper.proto;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.bookkeeper.proto.BookieProtocol.FLAG_NONE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
@@ -44,6 +45,7 @@ import
org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader;
import org.apache.bookkeeper.proto.BookkeeperProtocol.OperationType;
import org.apache.bookkeeper.proto.BookkeeperProtocol.ProtocolVersion;
import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
+import org.apache.bookkeeper.util.ByteBufList;
import org.junit.Before;
import org.junit.Test;
@@ -131,4 +133,36 @@ public class BookieProtoEncodingTest {
v2ReqEncoder.decode((ByteBuf) v3ReqEncoder.encode(v3Req,
UnpooledByteBufAllocator.DEFAULT));
}
+ @Test
+ public void testV2BatchReadRequest() throws Exception {
+ RequestEnDeCoderPreV3 v2ReqEncoder = new
RequestEnDeCoderPreV3(registry);
+ BookieProtocol.BatchedReadRequest req =
BookieProtocol.BatchedReadRequest.create(
+ BookieProtocol.CURRENT_PROTOCOL_VERSION, 1L, 1L, FLAG_NONE,
null, 1L, 10, 1024L);
+ ByteBuf buf = (ByteBuf) v2ReqEncoder.encode(req,
UnpooledByteBufAllocator.DEFAULT);
+ buf.readInt(); // Skip the frame size.
+ BookieProtocol.BatchedReadRequest reqDecoded =
(BookieProtocol.BatchedReadRequest) v2ReqEncoder.decode(buf);
+ assertEquals(req.ledgerId, reqDecoded.ledgerId);
+ assertEquals(req.entryId, reqDecoded.entryId);
+ assertEquals(req.maxSize, reqDecoded.maxSize);
+ assertEquals(req.maxCount, reqDecoded.maxCount);
+ reqDecoded.recycle();
+ }
+
+ @Test
+ public void testV2BatchReadResponse() throws Exception {
+ ResponseEnDeCoderPreV3 v2ReqEncoder = new
ResponseEnDeCoderPreV3(registry);
+ ByteBuf first =
UnpooledByteBufAllocator.DEFAULT.buffer(4).writeInt(10);
+ ByteBuf second =
UnpooledByteBufAllocator.DEFAULT.buffer(8).writeLong(10L);
+ ByteBufList data = ByteBufList.get(first, second);
+ BookieProtocol.BatchedReadResponse res = new
BookieProtocol.BatchedReadResponse(
+ BookieProtocol.CURRENT_PROTOCOL_VERSION, 1, 1L, 1L, 1L, data);
+ ByteBuf buf = (ByteBuf) v2ReqEncoder.encode(res,
UnpooledByteBufAllocator.DEFAULT);
+ buf.readInt(); // Skip the frame size.
+ BookieProtocol.BatchedReadResponse resDecoded =
(BookieProtocol.BatchedReadResponse) v2ReqEncoder.decode(buf);
+ assertEquals(res.ledgerId, resDecoded.ledgerId);
+ assertEquals(res.entryId, resDecoded.entryId);
+ assertEquals(res.getData().size(), resDecoded.getData().size());
+ assertEquals(res.getData().readableBytes(),
resDecoded.getData().readableBytes());
+ }
+
}