This is an automated email from the ASF dual-hosted git repository.

chenhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 92e41c1493 [BP-62] Batch read protocol change: Support encoding and 
decoding for batchReadRequest and batchReadResponse (#4177)
92e41c1493 is described below

commit 92e41c14934cf2bd8eef470ad35f0f9660886737
Author: Yan Zhao <[email protected]>
AuthorDate: Tue Jan 23 10:15:06 2024 +0800

    [BP-62] Batch read protocol change: Support encoding and decoding for 
batchReadRequest and batchReadResponse (#4177)
    
    ### Motivation
    This is the first PR for the batch read(#4051) feature.
    
    Protocol change: Support encoding and decoding for batchReadRequest and 
batchReadResponse
---
 .../bookkeeper/proto/BookieProtoEncoding.java      |  93 +++++++++++++-
 .../apache/bookkeeper/proto/BookieProtocol.java    | 139 ++++++++++++++++++++-
 .../bookkeeper/proto/BookieProtoEncodingTest.java  |  34 +++++
 3 files changed, 264 insertions(+), 2 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
index c56235dbe6..a93b63f1f6 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
@@ -110,7 +110,30 @@ public class BookieProtoEncoding {
                 return msg;
             }
             BookieProtocol.Request r = (BookieProtocol.Request) msg;
-            if (r instanceof BookieProtocol.ReadRequest) {
+            if (r instanceof BookieProtocol.BatchedReadRequest) {
+                int totalHeaderSize = 4 // for request type
+                        + 8 // for ledger id
+                        + 8 // for entry id
+                        + 8 // for request id
+                        + 4 // for max count
+                        + 8; // for max size
+                if (r.hasMasterKey()) {
+                    totalHeaderSize += BookieProtocol.MASTER_KEY_LENGTH;
+                }
+                ByteBuf buf = allocator.buffer(totalHeaderSize + 4 /* frame 
size */);
+                buf.writeInt(totalHeaderSize);
+                buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(), 
r.getOpCode(), r.getFlags()));
+                buf.writeLong(r.getLedgerId());
+                buf.writeLong(r.getEntryId());
+                buf.writeLong(((BookieProtocol.BatchedReadRequest) 
r).getRequestId());
+                buf.writeInt(((BookieProtocol.BatchedReadRequest) 
r).getMaxCount());
+                buf.writeLong(((BookieProtocol.BatchedReadRequest) 
r).getMaxSize());
+                if (r.hasMasterKey()) {
+                    buf.writeBytes(r.getMasterKey(), 0, 
BookieProtocol.MASTER_KEY_LENGTH);
+                }
+                r.recycle();
+                return buf;
+            } else if (r instanceof BookieProtocol.ReadRequest) {
                 int totalHeaderSize = 4 // for request type
                     + 8 // for ledgerId
                     + 8; // for entryId
@@ -181,6 +204,21 @@ public class BookieProtoEncoding {
                 } else {
                     return BookieProtocol.ReadRequest.create(version, 
ledgerId, entryId, flags, null);
                 }
+            case BookieProtocol.BATCH_READ_ENTRY:
+                ledgerId = packet.readLong();
+                entryId = packet.readLong();
+                long requestId = packet.readLong();
+                int maxCount = packet.readInt();
+                long maxSize = packet.readLong();
+                if ((flags & BookieProtocol.FLAG_DO_FENCING) == 
BookieProtocol.FLAG_DO_FENCING
+                        && version >= 2) {
+                    byte[] masterKey = readMasterKey(packet);
+                    return BookieProtocol.BatchedReadRequest.create(version, 
ledgerId, entryId, flags, masterKey,
+                            requestId, maxCount, maxSize);
+                } else {
+                    return BookieProtocol.BatchedReadRequest.create(version, 
ledgerId, entryId, flags, null,
+                            requestId, maxCount, maxSize);
+                }
             case BookieProtocol.AUTH:
                 BookkeeperProtocol.AuthMessage.Builder builder = 
BookkeeperProtocol.AuthMessage.newBuilder();
                 builder.mergeFrom(new ByteBufInputStream(packet), 
extensionRegistry);
@@ -260,6 +298,40 @@ public class BookieProtoEncoding {
                     } else {
                         return ByteBufList.get(buf, rr.getData());
                     }
+                } else if (msg instanceof BookieProtocol.BatchedReadResponse) {
+                    BookieProtocol.BatchedReadResponse brr = 
(BookieProtocol.BatchedReadResponse) r;
+                    int payloadSize = brr.getData().readableBytes();
+                    int delimiterSize = brr.getData().size() * 4; // The size 
of each entry.
+                    boolean isSmallEntry = (payloadSize + delimiterSize) < 
SMALL_ENTRY_SIZE_THRESHOLD;
+
+                    int responseSize = RESPONSE_HEADERS_SIZE + 8 /* request_id 
*/ + payloadSize + delimiterSize;
+                    int bufferSize = 4 /* frame size */ + responseSize;
+                    ByteBuf buf = allocator.buffer(bufferSize);
+                    buf.writeInt(responseSize);
+                    buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(), 
r.getOpCode(), (short) 0));
+                    buf.writeInt(r.getErrorCode());
+                    buf.writeLong(r.getLedgerId());
+                    buf.writeLong(r.getEntryId());
+                    buf.writeLong(((BookieProtocol.BatchedReadResponse) 
r).getRequestId());
+                    if (isSmallEntry) {
+                        for (int i = 0; i < brr.getData().size(); i++) {
+                            ByteBuf entryData = brr.getData().getBuffer(i);
+                            buf.writeInt(entryData.readableBytes());
+                            buf.writeBytes(entryData);
+                        }
+                        brr.release();
+                        return buf;
+                    } else {
+                        ByteBufList byteBufList = ByteBufList.get(buf);
+                        for (int i = 0; i < brr.getData().size(); i++) {
+                            ByteBuf entryData = brr.getData().getBuffer(i);
+                            ByteBuf entryLengthBuf = allocator.buffer(4);
+                            entryLengthBuf.writeInt(entryData.readableBytes());
+                            byteBufList.add(entryLengthBuf);
+                            byteBufList.add(entryData);
+                        }
+                        return byteBufList;
+                    }
                 } else if (msg instanceof BookieProtocol.AddResponse) {
                     ByteBuf buf = allocator.buffer(RESPONSE_HEADERS_SIZE + 4 
/* frame size */);
                     buf.writeInt(RESPONSE_HEADERS_SIZE);
@@ -309,6 +381,25 @@ public class BookieProtoEncoding {
 
                 return new BookieProtocol.ReadResponse(
                         version, rc, ledgerId, entryId, 
buffer.retainedSlice());
+            case BookieProtocol.BATCH_READ_ENTRY:
+                rc = buffer.readInt();
+                ledgerId = buffer.readLong();
+                entryId = buffer.readLong();
+                long requestId = buffer.readLong();
+                ByteBufList data = null;
+                while (buffer.readableBytes() > 0) {
+                    int entrySize = buffer.readInt();
+                    int entryPos = buffer.readerIndex();
+                    if (data == null) {
+                        data = ByteBufList.get(buffer.retainedSlice(entryPos, 
entrySize));
+                        buffer.readerIndex(entryPos + entrySize);
+                    } else {
+                        data.add(buffer.retainedSlice(entryPos, entrySize));
+                        buffer.readerIndex(entryPos + entrySize);
+                    }
+                }
+                return new BookieProtocol.BatchedReadResponse(version, rc, 
ledgerId, entryId, requestId, data == null
+                        ? null : data.retain());
             case BookieProtocol.AUTH:
                 ByteBufInputStream bufStream = new ByteBufInputStream(buffer);
                 BookkeeperProtocol.AuthMessage.Builder builder = 
BookkeeperProtocol.AuthMessage.newBuilder();
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
index 3a27f08a95..68b8d61988 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
@@ -27,6 +27,7 @@ import io.netty.util.Recycler.Handle;
 import io.netty.util.ReferenceCountUtil;
 import io.netty.util.ReferenceCounted;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
+import org.apache.bookkeeper.util.ByteBufList;
 
 /**
  * The packets of the Bookie protocol all have a 4-byte integer indicating the
@@ -132,6 +133,7 @@ public interface BookieProtocol {
     byte READ_LAC = 4;
     byte WRITE_LAC = 5;
     byte GET_BOOKIE_INFO = 6;
+    byte BATCH_READ_ENTRY = 7;
 
     /**
      * The error code that indicates success.
@@ -328,6 +330,10 @@ public interface BookieProtocol {
 
         private final Handle<ReadRequest> recyclerHandle;
 
+        private ReadRequest() {
+            recyclerHandle = null;
+        }
+
         private ReadRequest(Handle<ReadRequest> recyclerHandle) {
             this.recyclerHandle = recyclerHandle;
         }
@@ -344,7 +350,74 @@ public interface BookieProtocol {
             ledgerId = -1;
             entryId = -1;
             masterKey = null;
-            recyclerHandle.recycle(this);
+            if (recyclerHandle != null) {
+                recyclerHandle.recycle(this);
+            }
+        }
+    }
+
+    /**
+     * The request for reading data with batch optimization.
+     * The ledger_id and entry_id will be used as start_ledger_id and 
start_entry_id.
+     * And the batch read operation can only happen on one ledger.
+     */
+    class BatchedReadRequest extends ReadRequest {
+
+        long requestId;
+        int maxCount;
+        long maxSize;
+
+        static BatchedReadRequest create(byte protocolVersion, long ledgerId, 
long entryId,
+                short flags, byte[] masterKey, long requestId, int maxCount, 
long maxSize) {
+            BatchedReadRequest request = RECYCLER.get();
+            request.protocolVersion = protocolVersion;
+            request.ledgerId = ledgerId;
+            request.entryId = entryId;
+            request.flags = flags;
+            request.masterKey = masterKey;
+            request.requestId = requestId;
+            request.maxCount = maxCount;
+            request.maxSize = maxSize;
+            request.opCode = BATCH_READ_ENTRY;
+            return request;
+        }
+
+        int getMaxCount() {
+            return maxCount;
+        }
+
+        long getMaxSize() {
+            return maxSize;
+        }
+
+        long getRequestId() {
+            return requestId;
+        }
+
+        private final Handle<BatchedReadRequest> recyclerHandle;
+
+        private BatchedReadRequest(Handle<BatchedReadRequest> recyclerHandle) {
+            this.recyclerHandle = recyclerHandle;
+        }
+
+        private static final Recycler<BatchedReadRequest> RECYCLER = new 
Recycler<BatchedReadRequest>() {
+            @Override
+            protected BatchedReadRequest newObject(Handle<BatchedReadRequest> 
handle) {
+                return new BatchedReadRequest(handle);
+            }
+        };
+
+        @Override
+        public void recycle() {
+            ledgerId = -1;
+            entryId = -1;
+            masterKey = null;
+            maxCount = -1;
+            maxSize = -1;
+            requestId = -1;
+            if (recyclerHandle != null) {
+                recyclerHandle.recycle(this);
+            }
         }
     }
 
@@ -479,6 +552,70 @@ public interface BookieProtocol {
         }
     }
 
+    /**
+     * The response for batched read.
+     * The ledger_id and entry_id will be used as start_ledger_id and 
start_entry_id.
+     * And all the returned data is from one ledger.
+     */
+    class BatchedReadResponse extends Response implements ReferenceCounted {
+
+        final long requestId;
+        final ByteBufList data;
+
+        BatchedReadResponse(byte protocolVersion, int errorCode, long 
ledgerId, long entryId, long requestId,
+                ByteBufList data) {
+            init(protocolVersion, BATCH_READ_ENTRY, errorCode, ledgerId, 
entryId);
+            this.requestId = requestId;
+            this.data = data;
+        }
+
+        ByteBufList getData() {
+            return data;
+        }
+
+        long getRequestId() {
+            return requestId;
+        }
+
+        @Override
+        public int refCnt() {
+            return data.refCnt();
+        }
+
+        @Override
+        public ReferenceCounted retain() {
+            data.retain();
+            return this;
+        }
+
+        @Override
+        public ReferenceCounted retain(int increment) {
+            return data.retain(increment);
+        }
+
+        @Override
+        public ReferenceCounted touch() {
+            data.touch();
+            return this;
+        }
+
+        @Override
+        public ReferenceCounted touch(Object hint) {
+            data.touch(hint);
+            return this;
+        }
+
+        @Override
+        public boolean release() {
+            return data.release();
+        }
+
+        @Override
+        public boolean release(int decrement) {
+            return data.release(decrement);
+        }
+    }
+
     /**
      * A response that adds data.
      */
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java
index bba26f37a2..4f719ddfc0 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieProtoEncodingTest.java
@@ -19,6 +19,7 @@
 package org.apache.bookkeeper.proto;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.bookkeeper.proto.BookieProtocol.FLAG_NONE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.any;
@@ -44,6 +45,7 @@ import 
org.apache.bookkeeper.proto.BookkeeperProtocol.BKPacketHeader;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.OperationType;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.ProtocolVersion;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.StatusCode;
+import org.apache.bookkeeper.util.ByteBufList;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -131,4 +133,36 @@ public class BookieProtoEncodingTest {
         v2ReqEncoder.decode((ByteBuf) v3ReqEncoder.encode(v3Req, 
UnpooledByteBufAllocator.DEFAULT));
     }
 
+    @Test
+    public void testV2BatchReadRequest() throws Exception {
+        RequestEnDeCoderPreV3 v2ReqEncoder = new 
RequestEnDeCoderPreV3(registry);
+        BookieProtocol.BatchedReadRequest req = 
BookieProtocol.BatchedReadRequest.create(
+                BookieProtocol.CURRENT_PROTOCOL_VERSION, 1L, 1L, FLAG_NONE, 
null, 1L, 10, 1024L);
+        ByteBuf buf = (ByteBuf) v2ReqEncoder.encode(req, 
UnpooledByteBufAllocator.DEFAULT);
+        buf.readInt(); // Skip the frame size.
+        BookieProtocol.BatchedReadRequest reqDecoded = 
(BookieProtocol.BatchedReadRequest) v2ReqEncoder.decode(buf);
+        assertEquals(req.ledgerId, reqDecoded.ledgerId);
+        assertEquals(req.entryId, reqDecoded.entryId);
+        assertEquals(req.maxSize, reqDecoded.maxSize);
+        assertEquals(req.maxCount, reqDecoded.maxCount);
+        reqDecoded.recycle();
+    }
+
+    @Test
+    public void testV2BatchReadResponse() throws Exception {
+        ResponseEnDeCoderPreV3 v2ReqEncoder = new 
ResponseEnDeCoderPreV3(registry);
+        ByteBuf first = 
UnpooledByteBufAllocator.DEFAULT.buffer(4).writeInt(10);
+        ByteBuf second = 
UnpooledByteBufAllocator.DEFAULT.buffer(8).writeLong(10L);
+        ByteBufList data = ByteBufList.get(first, second);
+        BookieProtocol.BatchedReadResponse res = new 
BookieProtocol.BatchedReadResponse(
+                BookieProtocol.CURRENT_PROTOCOL_VERSION, 1, 1L, 1L, 1L, data);
+        ByteBuf buf = (ByteBuf) v2ReqEncoder.encode(res, 
UnpooledByteBufAllocator.DEFAULT);
+        buf.readInt(); // Skip the frame size.
+        BookieProtocol.BatchedReadResponse resDecoded = 
(BookieProtocol.BatchedReadResponse) v2ReqEncoder.decode(buf);
+        assertEquals(res.ledgerId, resDecoded.ledgerId);
+        assertEquals(res.entryId, resDecoded.entryId);
+        assertEquals(res.getData().size(), resDecoded.getData().size());
+        assertEquals(res.getData().readableBytes(), 
resDecoded.getData().readableBytes());
+    }
+
 }

Reply via email to