This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 234b817cdb Single buffer for small add requests (#3783)
234b817cdb is described below
commit 234b817cdb4e054887ffd5e42eaed25dc02daf63
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Mar 20 10:38:11 2023 -0700
Single buffer for small add requests (#3783)
* Single buffer for small add requests
* Fixed checkstyle
* Fixed treating of ComposityByteBuf
* Fixed merge issues
* Fixed merge issues
* WIP
* Fixed test and removed dead code
* Removed unused import
* Fixed BookieJournalTest
* removed unused import
* fix the checkstyle
* fix failed test
* fix failed test
---------
Co-authored-by: chenhang <[email protected]>
---
.../client/LedgerFragmentReplicator.java | 17 +++--
.../org/apache/bookkeeper/client/PendingAddOp.java | 7 +-
.../org/apache/bookkeeper/proto/AuthHandler.java | 6 +-
.../org/apache/bookkeeper/proto/BookieClient.java | 3 +-
.../apache/bookkeeper/proto/BookieClientImpl.java | 7 +-
.../bookkeeper/proto/BookieProtoEncoding.java | 22 ++-----
.../apache/bookkeeper/proto/BookieProtocol.java | 53 ---------------
.../bookkeeper/proto/PerChannelBookieClient.java | 31 +++++----
.../bookkeeper/proto/checksum/DigestManager.java | 75 ++++++++++++++++++++--
.../org/apache/bookkeeper/client/ClientUtil.java | 6 +-
.../bookkeeper/client/LedgerHandleAdapter.java | 9 +--
.../bookkeeper/client/MockBookKeeperTestCase.java | 8 ++-
.../client/ParallelLedgerRecoveryTest.java | 7 +-
.../client/ReadLastConfirmedAndEntryOpTest.java | 14 ++--
.../bookkeeper/client/TestPendingReadLacOp.java | 11 +++-
.../proto/BookieBackpressureForV2Test.java | 4 ++
.../apache/bookkeeper/proto/MockBookieClient.java | 33 ++++++++--
.../org/apache/bookkeeper/proto/MockBookies.java | 4 +-
.../apache/bookkeeper/proto/ProtocolBenchmark.java | 16 -----
.../proto/checksum/DigestManagerBenchmark.java | 5 +-
20 files changed, 187 insertions(+), 151 deletions(-)
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
index 877a3ac300..6b439b0960 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java
@@ -29,7 +29,9 @@ import static
org.apache.bookkeeper.replication.ReplicationStats.REPLICATION_WOR
import static
org.apache.bookkeeper.replication.ReplicationStats.WRITE_DATA_LATENCY;
import com.google.common.util.concurrent.RateLimiter;
+import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import io.netty.util.ReferenceCounted;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.Iterator;
@@ -403,17 +405,24 @@ public class LedgerFragmentReplicator {
numEntriesRead.inc();
numBytesRead.registerSuccessfulValue(dataLength);
- ByteBufList toSend = lh.getDigestManager()
+ ReferenceCounted toSend = lh.getDigestManager()
.computeDigestAndPackageForSending(entryId,
lh.getLastAddConfirmed(), entry.getLength(),
- Unpooled.wrappedBuffer(data, 0, data.length));
+ Unpooled.wrappedBuffer(data, 0, data.length),
+ lh.getLedgerKey(),
+ 0
+ );
if (replicationThrottle != null) {
- updateAverageEntrySize(toSend.readableBytes());
+ if (toSend instanceof ByteBuf) {
+ updateAverageEntrySize(((ByteBuf)
toSend).readableBytes());
+ } else if (toSend instanceof ByteBufList) {
+ updateAverageEntrySize(((ByteBufList)
toSend).readableBytes());
+ }
}
for (BookieId newBookie : newBookies) {
long startWriteEntryTime = MathUtils.nowInNano();
bkc.getBookieClient().addEntry(newBookie, lh.getId(),
- lh.getLedgerKey(), entryId,
ByteBufList.clone(toSend),
+ lh.getLedgerKey(), entryId, toSend,
multiWriteCallback, dataLength,
BookieProtocol.FLAG_RECOVERY_ADD,
false, WriteFlag.NONE);
writeDataLatency.registerSuccessfulEvent(
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
index 394c961cbc..51f559a86c 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java
@@ -28,6 +28,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
+import io.netty.util.ReferenceCounted;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
@@ -38,7 +39,6 @@ import
org.apache.bookkeeper.client.AsyncCallback.AddCallbackWithLatency;
import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
-import org.apache.bookkeeper.util.ByteBufList;
import org.apache.bookkeeper.util.MathUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,7 +56,7 @@ class PendingAddOp implements WriteCallback {
private static final Logger LOG =
LoggerFactory.getLogger(PendingAddOp.class);
ByteBuf payload;
- ByteBufList toSend;
+ ReferenceCounted toSend;
AddCallbackWithLatency cb;
Object ctx;
long entryId;
@@ -242,9 +242,10 @@ class PendingAddOp implements WriteCallback {
checkNotNull(lh);
checkNotNull(lh.macManager);
+ int flags = isRecoveryAdd ? FLAG_RECOVERY_ADD | FLAG_HIGH_PRIORITY :
FLAG_NONE;
this.toSend = lh.macManager.computeDigestAndPackageForSending(
entryId, lh.lastAddConfirmed, currentLedgerLength,
- payload);
+ payload, lh.ledgerKey, flags);
// ownership of RefCounted ByteBuf was passed to
computeDigestAndPackageForSending
payload = null;
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
index ba5dc9948d..f923b61ad5 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/AuthHandler.java
@@ -23,6 +23,7 @@ package org.apache.bookkeeper.proto;
import static
org.apache.bookkeeper.auth.AuthProviderFactoryFactory.AUTHENTICATION_DISABLED_PLUGIN_NAME;
import com.google.protobuf.ByteString;
+import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
@@ -39,6 +40,7 @@ import org.apache.bookkeeper.auth.BookieAuthProvider;
import org.apache.bookkeeper.auth.ClientAuthProvider;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
+import org.apache.bookkeeper.util.ByteBufList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -358,8 +360,10 @@ class AuthHandler {
} else {
waitingForAuth.add(msg);
}
+ } else if (msg instanceof ByteBuf || msg instanceof
ByteBufList) {
+ waitingForAuth.add(msg);
} else {
- LOG.info("dropping write of message {}", msg);
+ LOG.info("[{}] dropping write of message {}",
ctx.channel(), msg);
}
}
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
index 81be386f7c..938874fac0 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClient.java
@@ -20,6 +20,7 @@
*/
package org.apache.bookkeeper.proto;
+import io.netty.util.ReferenceCounted;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@@ -139,7 +140,7 @@ public interface BookieClient {
* {@link org.apache.bookkeeper.client.api.WriteFlag}
*/
void addEntry(BookieId address, long ledgerId, byte[] masterKey,
- long entryId, ByteBufList toSend, WriteCallback cb, Object
ctx,
+ long entryId, ReferenceCounted toSend, WriteCallback cb,
Object ctx,
int options, boolean allowFastFail, EnumSet<WriteFlag>
writeFlags);
/**
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
index cd11bc17d7..c305a51ea4 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieClientImpl.java
@@ -32,6 +32,7 @@ import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
+import io.netty.util.ReferenceCounted;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.io.IOException;
import java.util.EnumSet;
@@ -288,7 +289,7 @@ public class BookieClientImpl implements BookieClient,
PerChannelBookieClientFac
final long ledgerId,
final byte[] masterKey,
final long entryId,
- final ByteBufList toSend,
+ final ReferenceCounted toSend,
final WriteCallback cb,
final Object ctx,
final int options,
@@ -357,7 +358,7 @@ public class BookieClientImpl implements BookieClient,
PerChannelBookieClientFac
private final Handle<ChannelReadyForAddEntryCallback> recyclerHandle;
private BookieClientImpl bookieClient;
- private ByteBufList toSend;
+ private ReferenceCounted toSend;
private long ledgerId;
private long entryId;
private BookieId addr;
@@ -369,7 +370,7 @@ public class BookieClientImpl implements BookieClient,
PerChannelBookieClientFac
private EnumSet<WriteFlag> writeFlags;
static ChannelReadyForAddEntryCallback create(
- BookieClientImpl bookieClient, ByteBufList toSend, long
ledgerId,
+ BookieClientImpl bookieClient, ReferenceCounted toSend, long
ledgerId,
long entryId, BookieId addr, Object ctx,
WriteCallback cb, int options, byte[] masterKey, boolean
allowFastFail,
EnumSet<WriteFlag> writeFlags) {
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
index 8cf40d2b8e..c56235dbe6 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtoEncoding.java
@@ -110,23 +110,7 @@ public class BookieProtoEncoding {
return msg;
}
BookieProtocol.Request r = (BookieProtocol.Request) msg;
- if (r instanceof BookieProtocol.AddRequest) {
- BookieProtocol.AddRequest ar = (BookieProtocol.AddRequest) r;
- ByteBufList data = ar.getData();
-
- int totalHeaderSize = 4 // for the request header
- + BookieProtocol.MASTER_KEY_LENGTH; // for the master
key
-
- int totalPayloadSize = totalHeaderSize + data.readableBytes();
- ByteBuf buf = allocator.buffer(totalHeaderSize + 4 /* frame
size */);
- buf.writeInt(totalPayloadSize); // Frame header
- buf.writeInt(PacketHeader.toInt(r.getProtocolVersion(),
r.getOpCode(), r.getFlags()));
- buf.writeBytes(r.getMasterKey(), 0,
BookieProtocol.MASTER_KEY_LENGTH);
-
- ar.recycle();
- data.prepend(buf);
- return data;
- } else if (r instanceof BookieProtocol.ReadRequest) {
+ if (r instanceof BookieProtocol.ReadRequest) {
int totalHeaderSize = 4 // for request type
+ 8 // for ledgerId
+ 8; // for entryId
@@ -437,7 +421,9 @@ public class BookieProtoEncoding {
if (LOG.isTraceEnabled()) {
LOG.trace("Encode request {} to channel {}.", msg,
ctx.channel());
}
- if (msg instanceof BookkeeperProtocol.Request) {
+ if (msg instanceof ByteBuf || msg instanceof ByteBufList) {
+ ctx.write(msg, promise);
+ } else if (msg instanceof BookkeeperProtocol.Request) {
ctx.write(reqV3.encode(msg, ctx.alloc()), promise);
} else if (msg instanceof BookieProtocol.Request) {
ctx.write(reqPreV3.encode(msg, ctx.alloc()), promise);
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
index 86c3ed5469..3a27f08a95 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/BookieProtocol.java
@@ -27,7 +27,6 @@ import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.ReferenceCounted;
import org.apache.bookkeeper.proto.BookkeeperProtocol.AuthMessage;
-import org.apache.bookkeeper.util.ByteBufList;
/**
* The packets of the Bookie protocol all have a 4-byte integer indicating the
@@ -252,58 +251,6 @@ public interface BookieProtocol {
public void recycle() {}
}
- /**
- * A Request that adds data.
- */
- class AddRequest extends Request {
- ByteBufList data;
-
- static AddRequest create(byte protocolVersion, long ledgerId,
- long entryId, short flags, byte[] masterKey,
- ByteBufList data) {
- AddRequest add = RECYCLER.get();
- add.protocolVersion = protocolVersion;
- add.opCode = ADDENTRY;
- add.ledgerId = ledgerId;
- add.entryId = entryId;
- add.flags = flags;
- add.masterKey = masterKey;
- add.data = data.retain();
- return add;
- }
-
- ByteBufList getData() {
- // We need to have different ByteBufList instances for each bookie
write
- return ByteBufList.clone(data);
- }
-
- boolean isRecoveryAdd() {
- return (flags & FLAG_RECOVERY_ADD) == FLAG_RECOVERY_ADD;
- }
-
- private final Handle<AddRequest> recyclerHandle;
- private AddRequest(Handle<AddRequest> recyclerHandle) {
- this.recyclerHandle = recyclerHandle;
- }
-
- private static final Recycler<AddRequest> RECYCLER = new
Recycler<AddRequest>() {
- @Override
- protected AddRequest newObject(Handle<AddRequest> handle) {
- return new AddRequest(handle);
- }
- };
-
- @Override
- public void recycle() {
- ledgerId = -1;
- entryId = -1;
- masterKey = null;
- ReferenceCountUtil.release(data);
- data = null;
- recyclerHandle.recycle(this);
- }
- }
-
/**
* This is similar to add request, but it used when processing the request
on the bookie side.
*/
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
index 4d08faad20..41add30ca5 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/PerChannelBookieClient.java
@@ -62,6 +62,7 @@ import io.netty.incubator.channel.uring.IOUringSocketChannel;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
+import io.netty.util.ReferenceCounted;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
@@ -771,7 +772,7 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
* @param writeFlags
* WriteFlags
*/
- void addEntry(final long ledgerId, byte[] masterKey, final long entryId,
ByteBufList toSend, WriteCallback cb,
+ void addEntry(final long ledgerId, byte[] masterKey, final long entryId,
ReferenceCounted toSend, WriteCallback cb,
Object ctx, final int options, boolean allowFastFail, final
EnumSet<WriteFlag> writeFlags) {
Object request = null;
CompletionKey completionKey = null;
@@ -782,9 +783,12 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
return;
}
completionKey = acquireV2Key(ledgerId, entryId,
OperationType.ADD_ENTRY);
- request = BookieProtocol.AddRequest.create(
- BookieProtocol.CURRENT_PROTOCOL_VERSION, ledgerId, entryId,
- (short) options, masterKey, toSend);
+
+ if (toSend instanceof ByteBuf) {
+ request = ((ByteBuf) toSend).retainedDuplicate();
+ } else {
+ request = ByteBufList.clone((ByteBufList) toSend);
+ }
} else {
final long txnId = getTxnId();
completionKey = new V3CompletionKey(txnId,
OperationType.ADD_ENTRY);
@@ -799,11 +803,14 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
}
ByteString body = null;
- if (toSend.hasArray()) {
- body = UnsafeByteOperations.unsafeWrap(toSend.array(),
toSend.arrayOffset(), toSend.readableBytes());
+ ByteBufList bufToSend = (ByteBufList) toSend;
+
+ if (bufToSend.hasArray()) {
+ body = UnsafeByteOperations.unsafeWrap(bufToSend.array(),
bufToSend.arrayOffset(),
+ bufToSend.readableBytes());
} else {
- for (int i = 0; i < toSend.size(); i++) {
- ByteString piece =
UnsafeByteOperations.unsafeWrap(toSend.getBuffer(i).nioBuffer());
+ for (int i = 0; i < bufToSend.size(); i++) {
+ ByteString piece =
UnsafeByteOperations.unsafeWrap(bufToSend.getBuffer(i).nioBuffer());
// use ByteString.concat to avoid byte[] allocation when
toSend has multiple ByteBufs
body = (body == null) ? piece : body.concat(piece);
}
@@ -1143,14 +1150,6 @@ public class PerChannelBookieClient extends
ChannelInboundHandlerAdapter {
StringUtils.requestToString(request));
errorOut(key, BKException.Code.TooManyRequestsException);
-
- // If the request is a V2 add request, we retained the data's
reference when creating the AddRequest
- // object. To avoid the object leak, we need to release the
reference if we met any errors
- // before sending it.
- if (request instanceof BookieProtocol.AddRequest) {
- BookieProtocol.AddRequest ar = (BookieProtocol.AddRequest)
request;
- ar.recycle();
- }
return;
}
diff --git
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
index a97c301311..9f931f731a 100644
---
a/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
+++
b/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java
@@ -24,11 +24,14 @@ import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.util.ReferenceCountUtil;
+import io.netty.util.ReferenceCounted;
import io.netty.util.concurrent.FastThreadLocal;
import java.security.GeneralSecurityException;
import java.security.NoSuchAlgorithmException;
import org.apache.bookkeeper.client.BKException.BKDigestMatchException;
import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.proto.BookieProtoEncoding;
+import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.DigestType;
import org.apache.bookkeeper.util.ByteBufList;
import org.slf4j.Logger;
@@ -97,14 +100,76 @@ public abstract class DigestManager {
* @param data
* @return
*/
- public ByteBufList computeDigestAndPackageForSending(long entryId, long
lastAddConfirmed, long length,
- ByteBuf data) {
- ByteBuf headersBuffer;
+ public ReferenceCounted computeDigestAndPackageForSending(long entryId,
long lastAddConfirmed, long length,
+ ByteBuf data,
byte[] masterKey, int flags) {
if (this.useV2Protocol) {
- headersBuffer = allocator.buffer(METADATA_LENGTH + macCodeLength);
+ return computeDigestAndPackageForSendingV2(entryId,
lastAddConfirmed, length, data, masterKey, flags);
+ } else {
+ return computeDigestAndPackageForSendingV3(entryId,
lastAddConfirmed, length, data);
+ }
+ }
+
+ private ReferenceCounted computeDigestAndPackageForSendingV2(long entryId,
long lastAddConfirmed, long length,
+ ByteBuf data,
byte[] masterKey, int flags) {
+ boolean isSmallEntry = data.readableBytes() <
BookieProtoEncoding.SMALL_ENTRY_SIZE_THRESHOLD;
+
+ int headersSize = 4 // Request header
+ + BookieProtocol.MASTER_KEY_LENGTH // for the master
key
+ + METADATA_LENGTH //
+ + macCodeLength;
+ int payloadSize = data.readableBytes();
+ int bufferSize = 4 + headersSize + (isSmallEntry ? payloadSize : 0);
+
+ ByteBuf buf = allocator.buffer(bufferSize, bufferSize);
+ buf.writeInt(headersSize + payloadSize);
+ buf.writeInt(
+ BookieProtocol.PacketHeader.toInt(
+ BookieProtocol.CURRENT_PROTOCOL_VERSION,
BookieProtocol.ADDENTRY, (short) flags));
+ buf.writeBytes(masterKey, 0, BookieProtocol.MASTER_KEY_LENGTH);
+
+ // The checksum is computed on the next part of the buffer only
+ buf.readerIndex(buf.writerIndex());
+ buf.writeLong(ledgerId);
+ buf.writeLong(entryId);
+ buf.writeLong(lastAddConfirmed);
+ buf.writeLong(length);
+
+ // Compute checksum over the headers
+ int digest = update(0, buf, buf.readerIndex(), buf.readableBytes());
+
+ // don't unwrap slices
+ final ByteBuf unwrapped = data.unwrap() != null && data.unwrap()
instanceof CompositeByteBuf
+ ? data.unwrap() : data;
+ ReferenceCountUtil.retain(unwrapped);
+ ReferenceCountUtil.safeRelease(data);
+
+ if (unwrapped instanceof CompositeByteBuf) {
+ CompositeByteBuf cbb = (CompositeByteBuf) unwrapped;
+ for (int i = 0; i < cbb.numComponents(); i++) {
+ ByteBuf b = cbb.component(i);
+ digest = update(digest, b, b.readerIndex(), b.readableBytes());
+ }
+ } else {
+ digest = update(digest, unwrapped, unwrapped.readerIndex(),
unwrapped.readableBytes());
+ }
+
+ populateValueAndReset(digest, buf);
+
+ // Reset the reader index to the beginning
+ buf.readerIndex(0);
+
+ if (isSmallEntry) {
+ buf.writeBytes(unwrapped);
+ unwrapped.release();
+ return buf;
} else {
- headersBuffer = Unpooled.buffer(METADATA_LENGTH + macCodeLength);
+ return ByteBufList.get(buf, unwrapped);
}
+ }
+
+ private ByteBufList computeDigestAndPackageForSendingV3(long entryId, long
lastAddConfirmed, long length,
+ ByteBuf data) {
+ ByteBuf headersBuffer = Unpooled.buffer(METADATA_LENGTH +
macCodeLength);
headersBuffer.writeLong(ledgerId);
headersBuffer.writeLong(entryId);
headersBuffer.writeLong(lastAddConfirmed);
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
index d24022e504..3f8af53c13 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java
@@ -27,8 +27,8 @@ import java.util.function.Function;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.meta.LedgerManager;
import org.apache.bookkeeper.proto.DataFormats.LedgerMetadataFormat.DigestType;
+import org.apache.bookkeeper.proto.MockBookieClient;
import org.apache.bookkeeper.proto.checksum.DigestManager;
-import org.apache.bookkeeper.util.ByteBufList;
import org.apache.bookkeeper.versioning.Versioned;
/**
@@ -48,8 +48,8 @@ public class ClientUtil {
int offset, int len) throws GeneralSecurityException {
DigestManager dm = DigestManager.instantiate(ledgerId, new byte[2],
DigestType.CRC32,
UnpooledByteBufAllocator.DEFAULT, true);
- return
ByteBufList.coalesce(dm.computeDigestAndPackageForSending(entryId,
lastAddConfirmed, length,
- Unpooled.wrappedBuffer(data, offset, len)));
+ return
MockBookieClient.copyDataWithSkipHeader(dm.computeDigestAndPackageForSending(entryId,
lastAddConfirmed,
+ length, Unpooled.wrappedBuffer(data, offset, len), new byte[20],
0));
}
/**
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerHandleAdapter.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerHandleAdapter.java
index 7d5cad6531..086e9f330c 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerHandleAdapter.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerHandleAdapter.java
@@ -20,7 +20,7 @@
package org.apache.bookkeeper.client;
import io.netty.buffer.ByteBuf;
-import org.apache.bookkeeper.util.ByteBufList;
+import org.apache.bookkeeper.proto.MockBookieClient;
/**
* Adapter for tests to get the public access from LedgerHandle for its default
@@ -28,8 +28,9 @@ import org.apache.bookkeeper.util.ByteBufList;
*/
public class LedgerHandleAdapter {
- public static ByteBufList toSend(LedgerHandle lh, long entryId, ByteBuf
data) {
- return
lh.getDigestManager().computeDigestAndPackageForSending(entryId,
lh.getLastAddConfirmed(),
- lh.addToLength(data.readableBytes()), data);
+ public static ByteBuf toSend(LedgerHandle lh, long entryId, ByteBuf data) {
+ return MockBookieClient.copyData(lh.getDigestManager()
+ .computeDigestAndPackageForSending(entryId,
lh.getLastAddConfirmed(),
+ lh.addToLength(data.readableBytes()), data, new
byte[20], 0));
}
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
index 63255310c3..e771012470 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/MockBookKeeperTestCase.java
@@ -35,6 +35,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.util.ReferenceCounted;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Collections;
@@ -67,6 +68,7 @@ import org.apache.bookkeeper.proto.BookieAddressResolver;
import org.apache.bookkeeper.proto.BookieClient;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
+import org.apache.bookkeeper.proto.MockBookieClient;
import org.apache.bookkeeper.proto.checksum.DigestManager;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.util.ByteBufList;
@@ -504,10 +506,10 @@ public abstract class MockBookKeeperTestCase {
if (mockEntry != null) {
LOG.info("readEntry - found mock entry {}@{} at {}",
entryId, ledgerId, bookieSocketAddress);
- ByteBufList entry =
macManager.computeDigestAndPackageForSending(entryId,
+ ReferenceCounted entry =
macManager.computeDigestAndPackageForSending(entryId,
mockEntry.lastAddConfirmed, mockEntry.payload.length,
- Unpooled.wrappedBuffer(mockEntry.payload));
- callback.readEntryComplete(BKException.Code.OK, ledgerId,
entryId, ByteBufList.coalesce(entry),
+ Unpooled.wrappedBuffer(mockEntry.payload), new
byte[20], 0);
+ callback.readEntryComplete(BKException.Code.OK, ledgerId,
entryId, MockBookieClient.copyData(entry),
args[4]);
entry.release();
} else {
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
index 0fa2f0d776..4efc4465e3 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java
@@ -26,6 +26,7 @@ import static org.junit.Assert.assertTrue;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import io.netty.util.ReferenceCounted;
import java.io.IOException;
import java.util.Enumeration;
import java.util.concurrent.CompletableFuture;
@@ -61,7 +62,6 @@ import
org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.Processor;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.WriteCallback;
import org.apache.bookkeeper.proto.checksum.DigestManager;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
-import org.apache.bookkeeper.util.ByteBufList;
import org.apache.bookkeeper.versioning.Version;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.commons.lang3.mutable.MutableInt;
@@ -425,9 +425,10 @@ public class ParallelLedgerRecoveryTest extends
BookKeeperClusterTestCase {
long entryId = 14;
long lac = 8;
byte[] data = "recovery-on-entry-gap-gap".getBytes(UTF_8);
- ByteBufList toSend =
+ ReferenceCounted toSend =
lh.macManager.computeDigestAndPackageForSending(
- entryId, lac, lh.getLength() + 100,
Unpooled.wrappedBuffer(data, 0, data.length));
+ entryId, lac, lh.getLength() + 100,
Unpooled.wrappedBuffer(data, 0, data.length),
+ new byte[20], 0);
final CountDownLatch addLatch = new CountDownLatch(1);
final AtomicBoolean addSuccess = new AtomicBoolean(false);
LOG.info("Add entry {} with lac = {}", entryId, lac);
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java
index b538a50c2a..8e3cfd72e4 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ReadLastConfirmedAndEntryOpTest.java
@@ -33,6 +33,7 @@ import static org.mockito.Mockito.when;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.util.ReferenceCounted;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -166,10 +167,15 @@ public class ReadLastConfirmedAndEntryOpTest {
final long lac = 1L;
ByteBuf data = Unpooled.copiedBuffer("test-speculative-responses",
UTF_8);
- ByteBufList dataWithDigest =
digestManager.computeDigestAndPackageForSending(
- entryId, lac, data.readableBytes(), data);
- byte[] bytesWithDigest = new byte[dataWithDigest.readableBytes()];
- assertEquals(bytesWithDigest.length,
dataWithDigest.getBytes(bytesWithDigest));
+ ReferenceCounted refCnt =
digestManager.computeDigestAndPackageForSending(
+ entryId, lac, data.readableBytes(), data, new byte[20], 0);
+
+ byte[] bytesWithDigest = null;
+ if (refCnt instanceof ByteBufList) {
+ ByteBufList dataWithDigest = (ByteBufList) refCnt;
+ bytesWithDigest = new byte[dataWithDigest.readableBytes()];
+ assertEquals(bytesWithDigest.length,
dataWithDigest.getBytes(bytesWithDigest));
+ }
final Map<BookieId, ReadLastConfirmedAndEntryHolder> callbacks =
Collections.synchronizedMap(new HashMap<>());
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPendingReadLacOp.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPendingReadLacOp.java
index 82b031c5ae..a37462dee7 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPendingReadLacOp.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestPendingReadLacOp.java
@@ -23,8 +23,10 @@ package org.apache.bookkeeper.client;
import static org.junit.Assert.assertEquals;
import io.netty.buffer.Unpooled;
+import io.netty.util.ReferenceCounted;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import org.apache.bookkeeper.proto.MockBookieClient;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.util.ByteBufList;
import org.junit.Test;
@@ -57,17 +59,20 @@ public class TestPendingReadLacOp extends
BookKeeperClusterTestCase {
public void initiate() {
for (int i = 0; i < lh.getCurrentEnsemble().size(); i++) {
final int index = i;
- ByteBufList buffer =
lh.getDigestManager().computeDigestAndPackageForSending(
+ ReferenceCounted toSend =
lh.getDigestManager().computeDigestAndPackageForSending(
2,
1,
data.length,
- Unpooled.wrappedBuffer(data));
+ Unpooled.wrappedBuffer(data),
+ new byte[20],
+ 0);
+
bkc.scheduler.schedule(() -> {
readLacComplete(
0,
lh.getId(),
null,
- Unpooled.copiedBuffer(buffer.toArray()),
+ MockBookieClient.copyData(toSend),
index);
}, 0, TimeUnit.SECONDS);
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureForV2Test.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureForV2Test.java
index ce8d65fb76..8721a2c781 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureForV2Test.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/BookieBackpressureForV2Test.java
@@ -18,6 +18,8 @@
*/
package org.apache.bookkeeper.proto;
+import org.apache.bookkeeper.client.BookKeeperTestClient;
+import org.apache.bookkeeper.test.TestStatsProvider;
import org.junit.Before;
/**
@@ -30,6 +32,8 @@ public class BookieBackpressureForV2Test extends
BookieBackpressureTest {
public void setUp() throws Exception {
super.setUp();
baseClientConf.setUseV2WireProtocol(true);
+ bkc = new BookKeeperTestClient(baseClientConf, new
TestStatsProvider());
+
// the backpressure will bloc the read response, disable it to let it
use backpressure mechanism
confByIndex(0).setReadWorkerThreadsThrottlingEnabled(false);
}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java
index 2de9ab5e19..c4344c74d0 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookieClient.java
@@ -24,6 +24,7 @@ import static
org.apache.bookkeeper.proto.BookieProtocol.FLAG_RECOVERY_ADD;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
+import io.netty.util.ReferenceCounted;
import java.util.Arrays;
import java.util.Collections;
import java.util.EnumSet;
@@ -148,7 +149,7 @@ public class MockBookieClient implements BookieClient {
@Override
public void addEntry(BookieId addr, long ledgerId, byte[] masterKey,
- long entryId, ByteBufList toSend, WriteCallback cb,
Object ctx,
+ long entryId, ReferenceCounted toSend, WriteCallback
cb, Object ctx,
int options, boolean allowFastFail,
EnumSet<WriteFlag> writeFlags) {
toSend.retain();
preWriteHook.runHook(addr, ledgerId, entryId)
@@ -262,11 +263,29 @@ public class MockBookieClient implements BookieClient {
public void close() {
}
- private static ByteBuf copyData(ByteBufList list) {
- ByteBuf buf = Unpooled.buffer(list.readableBytes());
- for (int i = 0; i < list.size(); i++) {
- buf.writeBytes(list.getBuffer(i).slice());
+ public static ByteBuf copyData(ReferenceCounted rc) {
+ ByteBuf res;
+ if (rc instanceof ByteBuf) {
+ res = Unpooled.copiedBuffer((ByteBuf) rc);
+ } else {
+ res = ByteBufList.coalesce((ByteBufList) rc);
}
- return buf;
+
+ return res;
+ }
+
+ public static ByteBuf copyDataWithSkipHeader(ReferenceCounted rc) {
+ ByteBuf res;
+ if (rc instanceof ByteBuf) {
+ res = Unpooled.copiedBuffer((ByteBuf) rc);
+ } else {
+ res = ByteBufList.coalesce((ByteBufList) rc);
+ }
+
+ // Skip headers
+ res.skipBytes(28);
+ rc.release();
+
+ return res;
}
-}
\ No newline at end of file
+}
diff --git
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookies.java
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookies.java
index c670e87ff0..cef77c3f99 100644
---
a/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookies.java
+++
b/bookkeeper-server/src/test/java/org/apache/bookkeeper/proto/MockBookies.java
@@ -84,8 +84,8 @@ public class MockBookies {
DigestManager digestManager = DigestManager.instantiate(ledgerId, new
byte[0],
DataFormats.LedgerMetadataFormat.DigestType.CRC32C,
UnpooledByteBufAllocator.DEFAULT, false);
- return
ByteBufList.coalesce(digestManager.computeDigestAndPackageForSending(
- entryId, lac, 0, Unpooled.buffer(10)));
+ return ByteBufList.coalesce((ByteBufList)
digestManager.computeDigestAndPackageForSending(
+ entryId, lac, 0, Unpooled.buffer(10), new byte[20], 0));
}
diff --git
a/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/ProtocolBenchmark.java
b/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/ProtocolBenchmark.java
index 7eb8fa9774..308463608b 100644
---
a/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/ProtocolBenchmark.java
+++
b/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/ProtocolBenchmark.java
@@ -79,22 +79,6 @@ public class ProtocolBenchmark {
this.reqEnDeV3 = new RequestEnDecoderV3(null);
}
-
- @Benchmark
- public void testAddEntryV2() throws Exception {
- ByteBufList list = ByteBufList.get(entry.retainedSlice());
- BookieProtocol.AddRequest req = BookieProtocol.AddRequest.create(
- BookieProtocol.CURRENT_PROTOCOL_VERSION,
- ledgerId,
- entryId,
- flags,
- masterKey,
- list);
- Object res = this.reqEnDeV2.encode(req, ByteBufAllocator.DEFAULT);
- ReferenceCountUtil.release(res);
- ReferenceCountUtil.release(list);
- }
-
@Benchmark
public void testAddEntryV3() throws Exception {
// Build the request and calculate the total size to be included in
the packet.
diff --git
a/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManagerBenchmark.java
b/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManagerBenchmark.java
index d954562016..04fa500d47 100644
---
a/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManagerBenchmark.java
+++
b/microbenchmarks/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManagerBenchmark.java
@@ -83,8 +83,9 @@ public class DigestManagerBenchmark {
data.writeBytes(randomBytes(entrySize));
digestBuf = ByteBufAllocator.DEFAULT.directBuffer();
- digestBuf.writeBytes(ByteBufList.coalesce(
- dm.computeDigestAndPackageForSending(1234, 1234,
entrySize, data)));
+ digestBuf.writeBytes((ByteBuf)
+ dm.computeDigestAndPackageForSending(1234, 1234,
entrySize, data,
+ new byte[0], 0));
}
}