This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 234b817cdb Single buffer for small add requests (#3783)
234b817cdb is described below

commit 234b817cdb4e054887ffd5e42eaed25dc02daf63
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Mar 20 10:38:11 2023 -0700

    Single buffer for small add requests (#3783)
    
    * Single buffer for small add requests
    
    * Fixed checkstyle
    
    * Fixed treating of ComposityByteBuf
    
    * Fixed merge issues
    
    * Fixed merge issues
    
    * WIP
    
    * Fixed test and removed dead code
    
    * Removed unused import
    
    * Fixed BookieJournalTest
    
    * removed unused import
    
    * fix the checkstyle
    
    * fix failed test
    
    * fix failed test
    
    ---------
    
    Co-authored-by: chenhang <[email protected]>
---
 .../client/LedgerFragmentReplicator.java           | 17 +++--
 .../org/apache/bookkeeper/client/PendingAddOp.java |  7 +-
 .../org/apache/bookkeeper/proto/AuthHandler.java   |  6 +-
 .../org/apache/bookkeeper/proto/BookieClient.java  |  3 +-
 .../apache/bookkeeper/proto/BookieClientImpl.java  |  7 +-
 .../bookkeeper/proto/BookieProtoEncoding.java      | 22 ++-----
 .../apache/bookkeeper/proto/BookieProtocol.java    | 53 ---------------
 .../bookkeeper/proto/PerChannelBookieClient.java   | 31 +++++----
 .../bookkeeper/proto/checksum/DigestManager.java   | 75 ++++++++++++++++++++--
 .../org/apache/bookkeeper/client/ClientUtil.java   |  6 +-
 .../bookkeeper/client/LedgerHandleAdapter.java     |  9 +--
 .../bookkeeper/client/MockBookKeeperTestCase.java  |  8 ++-
 .../client/ParallelLedgerRecoveryTest.java         |  7 +-
 .../client/ReadLastConfirmedAndEntryOpTest.java    | 14 ++--
 .../bookkeeper/client/TestPendingReadLacOp.java    | 11 +++-
 .../proto/BookieBackpressureForV2Test.java         |  4 ++
 .../apache/bookkeeper/proto/MockBookieClient.java  | 33 ++++++++--
 .../org/apache/bookkeeper/proto/MockBookies.java   |  4 +-
 .../apache/bookkeeper/proto/ProtocolBenchmark.java | 16 -----
 .../proto/checksum/DigestManagerBenchmark.java     |  5 +-
 20 files changed, 187 insertions(+), 151 deletions(-)

diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
index 877a3ac300..6b439b0960 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
@@ -29,7 +29,9 @@ import static 
org.apache.bookkeeper.replication.ReplicationStats.REPLICATION_WOR
 import static 
org.apache.bookkeeper.replication.ReplicationStats.WRITE_DATA_LATENCY;
 
 import com.google.common.util.concurrent.RateLimiter;
+import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.util.ReferenceCounted;
 import java.util.Enumeration;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -403,17 +405,24 @@ public class LedgerFragmentReplicator {
                 numEntriesRead.inc();
                 numBytesRead.registerSuccessfulValue(dataLength);
 
-                ByteBufList toSend = lh.getDigestManager()
+                ReferenceCounted toSend = lh.getDigestManager()
                         .computeDigestAndPackageForSending(entryId,
                                 lh.getLastAddConfirmed(), entry.getLength(),
-                                Unpooled.wrappedBuffer(data, 0, data.length));
+                                Unpooled.wrappedBuffer(data, 0, data.length),
+                                lh.getLedgerKey(),
+                                0
+                                );
                 if (replicationThrottle != null) {
-                    updateAverageEntrySize(toSend.readableBytes());
+                    if (toSend instanceof ByteBuf) {
+                        updateAverageEntrySize(((ByteBuf) 
toSend).readableBytes());
+                    } else if (toSend instanceof ByteBufList) {
+                        updateAverageEntrySize(((ByteBufList) 
toSend).readableBytes());
+                    }
                 }
                 for (BookieId newBookie : newBookies) {
                     long startWriteEntryTime = MathUtils.nowInNano();
                     bkc.getBookieClient().addEntry(newBookie, lh.getId(),
-                            lh.getLedgerKey(), entryId, 
ByteBufList.clone(toSend),
+                            lh.getLedgerKey(), entryId, toSend,
                             multiWriteCallback, dataLength, 
BookieProtocol.FLAG_RECOVERY_ADD,
                             false, WriteFlag.NONE);
                     writeDataLatency.registerSuccessfulEvent(
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index 394c961cbc..51f559a86c 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -28,6 +28,7 @@ import io.netty.buffer.ByteBuf;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
 import io.netty.util.ReferenceCountUtil;
+import io.netty.util.ReferenceCounted;
 import java.util.EnumSet;
 import java.util.HashSet;
 import java.util.List;
@@ -38,7 +39,6 @@ import 
org.apache.bookkeeper.client.AsyncCallback.AddCallbackWithLatency;
 import org.apache.bookkeeper.client.api.WriteFlag;
 import org.apache.bookkeeper.net.BookieId;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
-import org.apache.bookkeeper.util.ByteBufList;
 import org.apache.bookkeeper.util.MathUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,7 +56,7 @@ class PendingAddOp implements WriteCallback {
     private static final Logger LOG = 
LoggerFactory.getLogger(PendingAddOp.class);
 
     ByteBuf payload;
-    ByteBufList toSend;
+    ReferenceCounted toSend;
     AddCallbackWithLatency cb;
     Object ctx;
     long entryId;
@@ -242,9 +242,10 @@ class PendingAddOp implements WriteCallback {
         checkNotNull(lh);
         checkNotNull(lh.macManager);
 
+        int flags = isRecoveryAdd ? FLAG_RECOVERY_ADD | FLAG_HIGH_PRIORITY : 
FLAG_NONE;
         this.toSend = lh.macManager.computeDigestAndPackageForSending(
                 entryId, lh.lastAddConfirmed, currentLedgerLength,
-                payload);
+                payload, lh.ledgerKey, flags);
         // ownership of RefCounted ByteBuf was passed to 
computeDigestAndPackageForSending
         payload = null;
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
index ba5dc9948d..f923b61ad5 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
@@ -23,6 +23,7 @@ package org.apache.bookkeeper.proto;
 import static 
org.apache.bookkeeper.auth.AuthProviderFactoryFactory.AUTHENTICATION_DISABLED_PLUGIN_NAME;
 
 import com.google.protobuf.ByteString;
+import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelDuplexHandler;
 import io.netty.channel.ChannelHandlerContext;
@@ -39,6 +40,7 @@ import org.apache.bookkeeper.auth.BookieAuthProvider;
 import org.apache.bookkeeper.auth.ClientAuthProvider;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
+import org.apache.bookkeeper.util.ByteBufList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -358,8 +360,10 @@ class AuthHandler {
                     } else {
                         waitingForAuth.add(msg);
                     }
+                } else if (msg instanceof ByteBuf || msg instanceof 
ByteBufList) {
+                    waitingForAuth.add(msg);
                 } else {
-                    LOG.info("dropping write of message {}", msg);
+                    LOG.info("[{}] dropping write of message {}", 
ctx.channel(), msg);
                 }
             }
         }
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index 81be386f7c..938874fac0 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -20,6 +20,7 @@
  */
 package org.apache.bookkeeper.proto;
 
+import io.netty.util.ReferenceCounted;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -139,7 +140,7 @@ public interface BookieClient {
      *                   {@link org.apache.bookkeeper.client.api.WriteFlag}
      */
     void addEntry(BookieId address, long ledgerId, byte[] masterKey,
-                  long entryId, ByteBufList toSend, WriteCallback cb, Object 
ctx,
+                  long entryId, ReferenceCounted toSend, WriteCallback cb, 
Object ctx,
                   int options, boolean allowFastFail, EnumSet<WriteFlag> 
writeFlags);
 
     /**
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
index cd11bc17d7..c305a51ea4 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
@@ -32,6 +32,7 @@ import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
 import io.netty.util.ReferenceCountUtil;
+import io.netty.util.ReferenceCounted;
 import io.netty.util.concurrent.DefaultThreadFactory;
 import java.io.IOException;
 import java.util.EnumSet;
@@ -288,7 +289,7 @@ public class BookieClientImpl implements BookieClient, 
PerChannelBookieClientFac
                          final long ledgerId,
                          final byte[] masterKey,
                          final long entryId,
-                         final ByteBufList toSend,
+                         final ReferenceCounted toSend,
                          final WriteCallback cb,
                          final Object ctx,
                          final int options,
@@ -357,7 +358,7 @@ public class BookieClientImpl implements BookieClient, 
PerChannelBookieClientFac
         private final Handle<ChannelReadyForAddEntryCallback> recyclerHandle;
 
         private BookieClientImpl bookieClient;
-        private ByteBufList toSend;
+        private ReferenceCounted toSend;
         private long ledgerId;
         private long entryId;
         private BookieId addr;
@@ -369,7 +370,7 @@ public class BookieClientImpl implements BookieClient, 
PerChannelBookieClientFac
         private EnumSet<WriteFlag> writeFlags;
 
         static ChannelReadyForAddEntryCallback create(
-                BookieClientImpl bookieClient, ByteBufList toSend, long 
ledgerId,
+                BookieClientImpl bookieClient, ReferenceCounted toSend, long 
ledgerId,
                 long entryId, BookieId addr, Object ctx,
                 WriteCallback cb, int options, byte[] masterKey, boolean 
allowFastFail,
                 EnumSet<WriteFlag> writeFlags) {
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
index 8cf40d2b8e..c56235dbe6 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
@@ -110,23 +110,7 @@ public class BookieProtoEncoding {
                 return msg;
             }
             BookieProtocol.Request r = (BookieProtocol.Request) msg;
-            if (r instanceof BookieProtocol.AddRequest) {
-                BookieProtocol.AddRequest ar = (BookieProtocol.AddRequest) r;
-                ByteBufList data = ar.getData();
-
-                int totalHeaderSize = 4 // for the request header
-                        + BookieProtocol.MASTER_KEY_LENGTH; // for the master 
key
-
-                int totalPayloadSize = totalHeaderSize + data.readableBytes();
-                ByteBuf buf = allocator.buffer(totalHeaderSize + 4 /* frame 
size */);
-                buf.writeInt(totalPayloadSize); // Frame header
-                buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(), 
r.getOpCode(), r.getFlags()));
-                buf.writeBytes(r.getMasterKey(), 0, 
BookieProtocol.MASTER_KEY_LENGTH);
-
-                ar.recycle();
-                data.prepend(buf);
-                return data;
-            } else if (r instanceof BookieProtocol.ReadRequest) {
+            if (r instanceof BookieProtocol.ReadRequest) {
                 int totalHeaderSize = 4 // for request type
                     + 8 // for ledgerId
                     + 8; // for entryId
@@ -437,7 +421,9 @@ public class BookieProtoEncoding {
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Encode request {} to channel {}.", msg, 
ctx.channel());
             }
-            if (msg instanceof BookkeeperProtocol.Request) {
+            if (msg instanceof ByteBuf || msg instanceof ByteBufList) {
+                ctx.write(msg, promise);
+            } else if (msg instanceof BookkeeperProtocol.Request) {
                 ctx.write(reqV3.encode(msg, ctx.alloc()), promise);
             } else if (msg instanceof BookieProtocol.Request) {
                 ctx.write(reqPreV3.encode(msg, ctx.alloc()), promise);
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
index 86c3ed5469..3a27f08a95 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
@@ -27,7 +27,6 @@ import io.netty.util.Recycler.Handle;
 import io.netty.util.ReferenceCountUtil;
 import io.netty.util.ReferenceCounted;
 import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
-import org.apache.bookkeeper.util.ByteBufList;
 
 /**
  * The packets of the Bookie protocol all have a 4-byte integer indicating the
@@ -252,58 +251,6 @@ public interface BookieProtocol {
         public void recycle() {}
     }
 
-    /**
-     * A Request that adds data.
-     */
-    class AddRequest extends Request {
-        ByteBufList data;
-
-        static AddRequest create(byte protocolVersion, long ledgerId,
-                                 long entryId, short flags, byte[] masterKey,
-                                 ByteBufList data) {
-            AddRequest add = RECYCLER.get();
-            add.protocolVersion = protocolVersion;
-            add.opCode = ADDENTRY;
-            add.ledgerId = ledgerId;
-            add.entryId = entryId;
-            add.flags = flags;
-            add.masterKey = masterKey;
-            add.data = data.retain();
-            return add;
-        }
-
-        ByteBufList getData() {
-            // We need to have different ByteBufList instances for each bookie 
write
-            return ByteBufList.clone(data);
-        }
-
-        boolean isRecoveryAdd() {
-            return (flags & FLAG_RECOVERY_ADD) == FLAG_RECOVERY_ADD;
-        }
-
-        private final Handle<AddRequest> recyclerHandle;
-        private AddRequest(Handle<AddRequest> recyclerHandle) {
-            this.recyclerHandle = recyclerHandle;
-        }
-
-        private static final Recycler<AddRequest> RECYCLER = new 
Recycler<AddRequest>() {
-            @Override
-            protected AddRequest newObject(Handle<AddRequest> handle) {
-                return new AddRequest(handle);
-            }
-        };
-
-        @Override
-        public void recycle() {
-            ledgerId = -1;
-            entryId = -1;
-            masterKey = null;
-            ReferenceCountUtil.release(data);
-            data = null;
-            recyclerHandle.recycle(this);
-        }
-    }
-
     /**
      * This is similar to add request, but it used when processing the request 
on the bookie side.
      */
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 4d08faad20..41add30ca5 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -62,6 +62,7 @@ import io.netty.incubator.channel.uring.IOUringSocketChannel;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
 import io.netty.util.ReferenceCountUtil;
+import io.netty.util.ReferenceCounted;
 import io.netty.util.concurrent.Future;
 import io.netty.util.concurrent.GenericFutureListener;
 import java.io.IOException;
@@ -771,7 +772,7 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
      * @param writeFlags
      *          WriteFlags
      */
-    void addEntry(final long ledgerId, byte[] masterKey, final long entryId, 
ByteBufList toSend, WriteCallback cb,
+    void addEntry(final long ledgerId, byte[] masterKey, final long entryId, 
ReferenceCounted toSend, WriteCallback cb,
                   Object ctx, final int options, boolean allowFastFail, final 
EnumSet<WriteFlag> writeFlags) {
         Object request = null;
         CompletionKey completionKey = null;
@@ -782,9 +783,12 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
                 return;
             }
             completionKey = acquireV2Key(ledgerId, entryId, 
OperationType.ADD_ENTRY);
-            request = BookieProtocol.AddRequest.create(
-                    BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId,
-                    (short) options, masterKey, toSend);
+
+            if (toSend instanceof ByteBuf) {
+                request = ((ByteBuf) toSend).retainedDuplicate();
+            } else {
+                request = ByteBufList.clone((ByteBufList) toSend);
+            }
         } else {
             final long txnId = getTxnId();
             completionKey = new V3CompletionKey(txnId, 
OperationType.ADD_ENTRY);
@@ -799,11 +803,14 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
             }
 
             ByteString body = null;
-            if (toSend.hasArray()) {
-                body = UnsafeByteOperations.unsafeWrap(toSend.array(), 
toSend.arrayOffset(), toSend.readableBytes());
+            ByteBufList bufToSend = (ByteBufList) toSend;
+
+            if (bufToSend.hasArray()) {
+                body = UnsafeByteOperations.unsafeWrap(bufToSend.array(), 
bufToSend.arrayOffset(),
+                        bufToSend.readableBytes());
             } else {
-                for (int i = 0; i < toSend.size(); i++) {
-                    ByteString piece = 
UnsafeByteOperations.unsafeWrap(toSend.getBuffer(i).nioBuffer());
+                for (int i = 0; i < bufToSend.size(); i++) {
+                    ByteString piece = 
UnsafeByteOperations.unsafeWrap(bufToSend.getBuffer(i).nioBuffer());
                     // use ByteString.concat to avoid byte[] allocation when 
toSend has multiple ByteBufs
                     body = (body == null) ? piece : body.concat(piece);
                 }
@@ -1143,14 +1150,6 @@ public class PerChannelBookieClient extends 
ChannelInboundHandlerAdapter {
                     StringUtils.requestToString(request));
 
             errorOut(key, BKException.Code.TooManyRequestsException);
-
-            // If the request is a V2 add request, we retained the data's 
reference when creating the AddRequest
-            // object. To avoid the object leak, we need to release the 
reference if we met any errors
-            // before sending it.
-            if (request instanceof BookieProtocol.AddRequest) {
-                BookieProtocol.AddRequest ar = (BookieProtocol.AddRequest) 
request;
-                ar.recycle();
-            }
             return;
         }
 
diff --git 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
index a97c301311..9f931f731a 100644
--- 
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
+++ 
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
@@ -24,11 +24,14 @@ import io.netty.buffer.CompositeByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.buffer.Unpooled;
 import io.netty.util.ReferenceCountUtil;
+import io.netty.util.ReferenceCounted;
 import io.netty.util.concurrent.FastThreadLocal;
 import java.security.GeneralSecurityException;
 import java.security.NoSuchAlgorithmException;
 import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.proto.BookieProtoEncoding;
+import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.DigestType;
 import org.apache.bookkeeper.util.ByteBufList;
 import org.slf4j.Logger;
@@ -97,14 +100,76 @@ public abstract class DigestManager {
      * @param data
      * @return
      */
-    public ByteBufList computeDigestAndPackageForSending(long entryId, long 
lastAddConfirmed, long length,
-            ByteBuf data) {
-        ByteBuf headersBuffer;
+    public ReferenceCounted computeDigestAndPackageForSending(long entryId, 
long lastAddConfirmed, long length,
+                                                              ByteBuf data, 
byte[] masterKey, int flags) {
         if (this.useV2Protocol) {
-            headersBuffer = allocator.buffer(METADATA_LENGTH + macCodeLength);
+            return computeDigestAndPackageForSendingV2(entryId, 
lastAddConfirmed, length, data, masterKey, flags);
+        } else {
+            return computeDigestAndPackageForSendingV3(entryId, 
lastAddConfirmed, length, data);
+        }
+    }
+
+    private ReferenceCounted computeDigestAndPackageForSendingV2(long entryId, 
long lastAddConfirmed, long length,
+                                                                 ByteBuf data, 
byte[] masterKey, int flags) {
+        boolean isSmallEntry = data.readableBytes() < 
BookieProtoEncoding.SMALL_ENTRY_SIZE_THRESHOLD;
+
+        int headersSize = 4 // Request header
+                        + BookieProtocol.MASTER_KEY_LENGTH // for the master 
key
+                        + METADATA_LENGTH  //
+                        + macCodeLength;
+        int payloadSize = data.readableBytes();
+        int bufferSize = 4 + headersSize + (isSmallEntry ? payloadSize : 0);
+
+        ByteBuf buf = allocator.buffer(bufferSize, bufferSize);
+        buf.writeInt(headersSize + payloadSize);
+        buf.writeInt(
+                BookieProtocol.PacketHeader.toInt(
+                        BookieProtocol.CURRENT_PROTOCOL_VERSION, 
BookieProtocol.ADDENTRY, (short) flags));
+        buf.writeBytes(masterKey, 0, BookieProtocol.MASTER_KEY_LENGTH);
+
+        // The checksum is computed on the next part of the buffer only
+        buf.readerIndex(buf.writerIndex());
+        buf.writeLong(ledgerId);
+        buf.writeLong(entryId);
+        buf.writeLong(lastAddConfirmed);
+        buf.writeLong(length);
+
+        // Compute checksum over the headers
+        int digest = update(0, buf, buf.readerIndex(), buf.readableBytes());
+
+        // don't unwrap slices
+        final ByteBuf unwrapped = data.unwrap() != null && data.unwrap() 
instanceof CompositeByteBuf
+                ? data.unwrap() : data;
+        ReferenceCountUtil.retain(unwrapped);
+        ReferenceCountUtil.safeRelease(data);
+
+        if (unwrapped instanceof CompositeByteBuf) {
+            CompositeByteBuf cbb = (CompositeByteBuf) unwrapped;
+            for (int i = 0; i < cbb.numComponents(); i++) {
+                ByteBuf b = cbb.component(i);
+                digest = update(digest, b, b.readerIndex(), b.readableBytes());
+            }
+        } else {
+            digest = update(digest, unwrapped, unwrapped.readerIndex(), 
unwrapped.readableBytes());
+        }
+
+        populateValueAndReset(digest, buf);
+
+        // Reset the reader index to the beginning
+        buf.readerIndex(0);
+
+        if (isSmallEntry) {
+            buf.writeBytes(unwrapped);
+            unwrapped.release();
+            return buf;
         } else {
-            headersBuffer = Unpooled.buffer(METADATA_LENGTH + macCodeLength);
+            return ByteBufList.get(buf, unwrapped);
         }
+    }
+
+    private ByteBufList computeDigestAndPackageForSendingV3(long entryId, long 
lastAddConfirmed, long length,
+                                                            ByteBuf data) {
+        ByteBuf headersBuffer = Unpooled.buffer(METADATA_LENGTH + 
macCodeLength);
         headersBuffer.writeLong(ledgerId);
         headersBuffer.writeLong(entryId);
         headersBuffer.writeLong(lastAddConfirmed);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
index d24022e504..3f8af53c13 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
@@ -27,8 +27,8 @@ import java.util.function.Function;
 import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.meta.LedgerManager;
 import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.DigestType;
+import org.apache.bookkeeper.proto.MockBookieClient;
 import org.apache.bookkeeper.proto.checksum.DigestManager;
-import org.apache.bookkeeper.util.ByteBufList;
 import org.apache.bookkeeper.versioning.Versioned;
 
 /**
@@ -48,8 +48,8 @@ public class ClientUtil {
             int offset, int len) throws GeneralSecurityException {
         DigestManager dm = DigestManager.instantiate(ledgerId, new byte[2], 
DigestType.CRC32,
                 UnpooledByteBufAllocator.DEFAULT, true);
-        return 
ByteBufList.coalesce(dm.computeDigestAndPackageForSending(entryId, 
lastAddConfirmed, length,
-                Unpooled.wrappedBuffer(data, offset, len)));
+        return 
MockBookieClient.copyDataWithSkipHeader(dm.computeDigestAndPackageForSending(entryId,
 lastAddConfirmed,
+            length, Unpooled.wrappedBuffer(data, offset, len), new byte[20], 
0));
     }
 
     /**
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerHandleAdapter.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerHandleAdapter.java
index 7d5cad6531..086e9f330c 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerHandleAdapter.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerHandleAdapter.java
@@ -20,7 +20,7 @@
 package org.apache.bookkeeper.client;
 
 import io.netty.buffer.ByteBuf;
-import org.apache.bookkeeper.util.ByteBufList;
+import org.apache.bookkeeper.proto.MockBookieClient;
 
 /**
  * Adapter for tests to get the public access from LedgerHandle for its default
@@ -28,8 +28,9 @@ import org.apache.bookkeeper.util.ByteBufList;
  */
 public class LedgerHandleAdapter {
 
-    public static ByteBufList toSend(LedgerHandle lh, long entryId, ByteBuf 
data) {
-        return 
lh.getDigestManager().computeDigestAndPackageForSending(entryId, 
lh.getLastAddConfirmed(),
-                lh.addToLength(data.readableBytes()), data);
+    public static ByteBuf toSend(LedgerHandle lh, long entryId, ByteBuf data) {
+        return MockBookieClient.copyData(lh.getDigestManager()
+                .computeDigestAndPackageForSending(entryId, 
lh.getLastAddConfirmed(),
+                        lh.addToLength(data.readableBytes()), data, new 
byte[20], 0));
     }
 }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
index 63255310c3..e771012470 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
@@ -35,6 +35,7 @@ import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.Unpooled;
 import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.util.ReferenceCounted;
 import java.security.GeneralSecurityException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -67,6 +68,7 @@ import org.apache.bookkeeper.proto.BookieAddressResolver;
 import org.apache.bookkeeper.proto.BookieClient;
 import org.apache.bookkeeper.proto.BookieProtocol;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
+import org.apache.bookkeeper.proto.MockBookieClient;
 import org.apache.bookkeeper.proto.checksum.DigestManager;
 import org.apache.bookkeeper.stats.NullStatsLogger;
 import org.apache.bookkeeper.util.ByteBufList;
@@ -504,10 +506,10 @@ public abstract class MockBookKeeperTestCase {
 
                 if (mockEntry != null) {
                     LOG.info("readEntry - found mock entry {}@{} at {}", 
entryId, ledgerId, bookieSocketAddress);
-                    ByteBufList entry = 
macManager.computeDigestAndPackageForSending(entryId,
+                    ReferenceCounted entry = 
macManager.computeDigestAndPackageForSending(entryId,
                         mockEntry.lastAddConfirmed, mockEntry.payload.length,
-                        Unpooled.wrappedBuffer(mockEntry.payload));
-                    callback.readEntryComplete(BKException.Code.OK, ledgerId, 
entryId, ByteBufList.coalesce(entry),
+                        Unpooled.wrappedBuffer(mockEntry.payload), new 
byte[20], 0);
+                    callback.readEntryComplete(BKException.Code.OK, ledgerId, 
entryId, MockBookieClient.copyData(entry),
                             args[4]);
                     entry.release();
                 } else {
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
index 0fa2f0d776..4efc4465e3 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.assertTrue;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.util.ReferenceCounted;
 import java.io.IOException;
 import java.util.Enumeration;
 import java.util.concurrent.CompletableFuture;
@@ -61,7 +62,6 @@ import 
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
 import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
 import org.apache.bookkeeper.proto.checksum.DigestManager;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
-import org.apache.bookkeeper.util.ByteBufList;
 import org.apache.bookkeeper.versioning.Version;
 import org.apache.bookkeeper.versioning.Versioned;
 import org.apache.commons.lang3.mutable.MutableInt;
@@ -425,9 +425,10 @@ public class ParallelLedgerRecoveryTest extends 
BookKeeperClusterTestCase {
         long entryId = 14;
         long lac = 8;
         byte[] data = "recovery-on-entry-gap-gap".getBytes(UTF_8);
-        ByteBufList toSend =
+        ReferenceCounted toSend =
                 lh.macManager.computeDigestAndPackageForSending(
-                        entryId, lac, lh.getLength() + 100, 
Unpooled.wrappedBuffer(data, 0, data.length));
+                        entryId, lac, lh.getLength() + 100, 
Unpooled.wrappedBuffer(data, 0, data.length),
+                        new byte[20], 0);
         final CountDownLatch addLatch = new CountDownLatch(1);
         final AtomicBoolean addSuccess = new AtomicBoolean(false);
         LOG.info("Add entry {} with lac = {}", entryId, lac);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java
index b538a50c2a..8e3cfd72e4 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java
@@ -33,6 +33,7 @@ import static org.mockito.Mockito.when;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.util.ReferenceCounted;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -166,10 +167,15 @@ public class ReadLastConfirmedAndEntryOpTest {
         final long lac = 1L;
 
         ByteBuf data = Unpooled.copiedBuffer("test-speculative-responses", 
UTF_8);
-        ByteBufList dataWithDigest = 
digestManager.computeDigestAndPackageForSending(
-            entryId, lac, data.readableBytes(), data);
-        byte[] bytesWithDigest = new byte[dataWithDigest.readableBytes()];
-        assertEquals(bytesWithDigest.length, 
dataWithDigest.getBytes(bytesWithDigest));
+        ReferenceCounted refCnt = 
digestManager.computeDigestAndPackageForSending(
+            entryId, lac, data.readableBytes(), data, new byte[20], 0);
+
+        byte[] bytesWithDigest = null;
+        if (refCnt instanceof ByteBufList) {
+            ByteBufList dataWithDigest = (ByteBufList) refCnt;
+            bytesWithDigest = new byte[dataWithDigest.readableBytes()];
+            assertEquals(bytesWithDigest.length, 
dataWithDigest.getBytes(bytesWithDigest));
+        }
 
         final Map<BookieId, ReadLastConfirmedAndEntryHolder> callbacks =
             Collections.synchronizedMap(new HashMap<>());
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPendingReadLacOp.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPendingReadLacOp.java
index 82b031c5ae..a37462dee7 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPendingReadLacOp.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPendingReadLacOp.java
@@ -23,8 +23,10 @@ package org.apache.bookkeeper.client;
 import static org.junit.Assert.assertEquals;
 
 import io.netty.buffer.Unpooled;
+import io.netty.util.ReferenceCounted;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.proto.MockBookieClient;
 import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
 import org.apache.bookkeeper.util.ByteBufList;
 import org.junit.Test;
@@ -57,17 +59,20 @@ public class TestPendingReadLacOp extends 
BookKeeperClusterTestCase {
             public void initiate() {
                 for (int i = 0; i < lh.getCurrentEnsemble().size(); i++) {
                     final int index = i;
-                    ByteBufList buffer = 
lh.getDigestManager().computeDigestAndPackageForSending(
+                    ReferenceCounted toSend = 
lh.getDigestManager().computeDigestAndPackageForSending(
                             2,
                             1,
                             data.length,
-                            Unpooled.wrappedBuffer(data));
+                            Unpooled.wrappedBuffer(data),
+                            new byte[20],
+                            0);
+
                     bkc.scheduler.schedule(() -> {
                         readLacComplete(
                                 0,
                                 lh.getId(),
                                 null,
-                                Unpooled.copiedBuffer(buffer.toArray()),
+                                MockBookieClient.copyData(toSend),
                                 index);
 
                     }, 0, TimeUnit.SECONDS);
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureForV2Test.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureForV2Test.java
index ce8d65fb76..8721a2c781 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureForV2Test.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureForV2Test.java
@@ -18,6 +18,8 @@
  */
 package org.apache.bookkeeper.proto;
 
+import org.apache.bookkeeper.client.BookKeeperTestClient;
+import org.apache.bookkeeper.test.TestStatsProvider;
 import org.junit.Before;
 
 /**
@@ -30,6 +32,8 @@ public class BookieBackpressureForV2Test extends 
BookieBackpressureTest {
     public void setUp() throws Exception {
         super.setUp();
         baseClientConf.setUseV2WireProtocol(true);
+        bkc = new BookKeeperTestClient(baseClientConf, new 
TestStatsProvider());
+
         // the backpressure will bloc the read response, disable it to let it 
use backpressure mechanism
         confByIndex(0).setReadWorkerThreadsThrottlingEnabled(false);
     }
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java
 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java
index 2de9ab5e19..c4344c74d0 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java
@@ -24,6 +24,7 @@ import static 
org.apache.bookkeeper.proto.BookieProtocol.FLAG_RECOVERY_ADD;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.util.ReferenceCounted;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -148,7 +149,7 @@ public class MockBookieClient implements BookieClient {
 
     @Override
     public void addEntry(BookieId addr, long ledgerId, byte[] masterKey,
-                         long entryId, ByteBufList toSend, WriteCallback cb, 
Object ctx,
+                         long entryId, ReferenceCounted toSend, WriteCallback 
cb, Object ctx,
                          int options, boolean allowFastFail, 
EnumSet<WriteFlag> writeFlags) {
         toSend.retain();
         preWriteHook.runHook(addr, ledgerId, entryId)
@@ -262,11 +263,29 @@ public class MockBookieClient implements BookieClient {
     public void close() {
     }
 
-    private static ByteBuf copyData(ByteBufList list) {
-        ByteBuf buf = Unpooled.buffer(list.readableBytes());
-        for (int i = 0; i < list.size(); i++) {
-            buf.writeBytes(list.getBuffer(i).slice());
+    public static ByteBuf copyData(ReferenceCounted rc) {
+        ByteBuf res;
+        if (rc instanceof ByteBuf) {
+            res = Unpooled.copiedBuffer((ByteBuf) rc);
+        } else {
+            res = ByteBufList.coalesce((ByteBufList) rc);
         }
-        return buf;
+
+        return res;
+    }
+
+    public static ByteBuf copyDataWithSkipHeader(ReferenceCounted rc) {
+        ByteBuf res;
+        if (rc instanceof ByteBuf) {
+            res = Unpooled.copiedBuffer((ByteBuf) rc);
+        } else {
+            res = ByteBufList.coalesce((ByteBufList) rc);
+        }
+
+        // Skip headers
+        res.skipBytes(28);
+        rc.release();
+
+        return res;
     }
-}
\ No newline at end of file
+}
diff --git 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookies.java 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookies.java
index c670e87ff0..cef77c3f99 100644
--- 
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookies.java
+++ 
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookies.java
@@ -84,8 +84,8 @@ public class MockBookies {
         DigestManager digestManager = DigestManager.instantiate(ledgerId, new 
byte[0],
                 DataFormats.LedgerMetadataFormat.DigestType.CRC32C,
                 UnpooledByteBufAllocator.DEFAULT, false);
-        return 
ByteBufList.coalesce(digestManager.computeDigestAndPackageForSending(
-                entryId, lac, 0, Unpooled.buffer(10)));
+        return ByteBufList.coalesce((ByteBufList) 
digestManager.computeDigestAndPackageForSending(
+                entryId, lac, 0, Unpooled.buffer(10), new byte[20], 0));
 
     }
 
diff --git 
a/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/ProtocolBenchmark.java
 
b/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/ProtocolBenchmark.java
index 7eb8fa9774..308463608b 100644
--- 
a/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/ProtocolBenchmark.java
+++ 
b/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/ProtocolBenchmark.java
@@ -79,22 +79,6 @@ public class ProtocolBenchmark {
         this.reqEnDeV3 = new RequestEnDecoderV3(null);
     }
 
-
-    @Benchmark
-    public void testAddEntryV2() throws Exception {
-        ByteBufList list = ByteBufList.get(entry.retainedSlice());
-        BookieProtocol.AddRequest req = BookieProtocol.AddRequest.create(
-                BookieProtocol.CURRENT_PROTOCOL_VERSION,
-                ledgerId,
-                entryId,
-                flags,
-                masterKey,
-                list);
-        Object res = this.reqEnDeV2.encode(req, ByteBufAllocator.DEFAULT);
-        ReferenceCountUtil.release(res);
-        ReferenceCountUtil.release(list);
-    }
-
     @Benchmark
     public void testAddEntryV3() throws Exception {
         // Build the request and calculate the total size to be included in 
the packet.
diff --git 
a/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManagerBenchmark.java
 
b/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManagerBenchmark.java
index d954562016..04fa500d47 100644
--- 
a/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManagerBenchmark.java
+++ 
b/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManagerBenchmark.java
@@ -83,8 +83,9 @@ public class DigestManagerBenchmark {
             data.writeBytes(randomBytes(entrySize));
 
             digestBuf = ByteBufAllocator.DEFAULT.directBuffer();
-            digestBuf.writeBytes(ByteBufList.coalesce(
-                    dm.computeDigestAndPackageForSending(1234, 1234, 
entrySize, data)));
+            digestBuf.writeBytes((ByteBuf)
+                    dm.computeDigestAndPackageForSending(1234, 1234, 
entrySize, data,
+                            new byte[0], 0));
         }
     }
 


Reply via email to