hangc0276 commented on code in PR #3783:
URL: https://github.com/apache/bookkeeper/pull/3783#discussion_r1118231210
##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java:
##########
@@ -96,14 +99,72 @@ public static byte[] generateMasterKey(byte[] password)
throws NoSuchAlgorithmEx
* @param data
* @return
*/
- public ByteBufList computeDigestAndPackageForSending(long entryId, long
lastAddConfirmed, long length,
- ByteBuf data) {
- ByteBuf headersBuffer;
+ public ReferenceCounted computeDigestAndPackageForSending(long entryId,
long lastAddConfirmed, long length,
+ ByteBuf data,
byte[] masterKey, int flags) {
if (this.useV2Protocol) {
- headersBuffer = allocator.buffer(METADATA_LENGTH + macCodeLength);
+ return computeDigestAndPackageForSendingV2(entryId,
lastAddConfirmed, length, data, masterKey, flags);
+ } else {
+ return computeDigestAndPackageForSendingV3(entryId,
lastAddConfirmed, length, data);
+ }
+ }
+
+ private ReferenceCounted computeDigestAndPackageForSendingV2(long entryId,
long lastAddConfirmed, long length,
+ ByteBuf data,
byte[] masterKey, int flags) {
+ boolean isSmallEntry = data.readableBytes() <
BookieProtoEncoding.SMALL_ENTRY_SIZE_THRESHOLD;
+
+ int headersSize = 4 // Request header
+ + BookieProtocol.MASTER_KEY_LENGTH // for the master
key
+ + METADATA_LENGTH //
+ + macCodeLength;
+ int payloadSize = data.readableBytes();
+ int bufferSize = 4 + headersSize + (isSmallEntry ? payloadSize : 0);
+
+ ByteBuf buf = allocator.buffer(bufferSize, bufferSize);
+ buf.writeInt(headersSize + payloadSize);
+ buf.writeInt(
+ BookieProtocol.PacketHeader.toInt(
+ BookieProtocol.CURRENT_PROTOCOL_VERSION,
BookieProtocol.ADDENTRY, (short) flags));
+ buf.writeBytes(masterKey, 0, BookieProtocol.MASTER_KEY_LENGTH);
+
+ // The checksum is computed on the next part of the buffer only
+ buf.readerIndex(buf.writerIndex());
+ buf.writeLong(ledgerId);
+ buf.writeLong(entryId);
+ buf.writeLong(lastAddConfirmed);
+ buf.writeLong(length);
+
+ // Compute checksum over the headers
+ update(buf);
+
+ // don't unwrap slices
+ final ByteBuf unwrapped = data.unwrap() != null && data.unwrap()
instanceof CompositeByteBuf
+ ? data.unwrap() : data;
+ ReferenceCountUtil.retain(unwrapped);
+ ReferenceCountUtil.safeRelease(data);
+
+ if (unwrapped instanceof CompositeByteBuf) {
+ ((CompositeByteBuf) unwrapped).forEach(this::update);
} else {
- headersBuffer = Unpooled.buffer(METADATA_LENGTH + macCodeLength);
+ update(unwrapped);
}
+
+ populateValueAndReset(buf);
+
+ // Reset the reader index to the beginning
+ buf.readerIndex(0);
+
+ if (isSmallEntry) {
+ buf.writeBytes(data);
+ data.release();
+ return buf;
+ } else {
+ return ByteBufList.get(buf, data);
Review Comment:
ByteBufList.get(buf, unwrap)?
##########
bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java:
##########
@@ -96,14 +99,72 @@ public static byte[] generateMasterKey(byte[] password)
throws NoSuchAlgorithmEx
* @param data
* @return
*/
- public ByteBufList computeDigestAndPackageForSending(long entryId, long
lastAddConfirmed, long length,
- ByteBuf data) {
- ByteBuf headersBuffer;
+ public ReferenceCounted computeDigestAndPackageForSending(long entryId,
long lastAddConfirmed, long length,
+ ByteBuf data,
byte[] masterKey, int flags) {
if (this.useV2Protocol) {
- headersBuffer = allocator.buffer(METADATA_LENGTH + macCodeLength);
+ return computeDigestAndPackageForSendingV2(entryId,
lastAddConfirmed, length, data, masterKey, flags);
+ } else {
+ return computeDigestAndPackageForSendingV3(entryId,
lastAddConfirmed, length, data);
+ }
+ }
+
+ private ReferenceCounted computeDigestAndPackageForSendingV2(long entryId,
long lastAddConfirmed, long length,
+ ByteBuf data,
byte[] masterKey, int flags) {
+ boolean isSmallEntry = data.readableBytes() <
BookieProtoEncoding.SMALL_ENTRY_SIZE_THRESHOLD;
+
+ int headersSize = 4 // Request header
+ + BookieProtocol.MASTER_KEY_LENGTH // for the master
key
+ + METADATA_LENGTH //
+ + macCodeLength;
+ int payloadSize = data.readableBytes();
+ int bufferSize = 4 + headersSize + (isSmallEntry ? payloadSize : 0);
+
+ ByteBuf buf = allocator.buffer(bufferSize, bufferSize);
+ buf.writeInt(headersSize + payloadSize);
+ buf.writeInt(
+ BookieProtocol.PacketHeader.toInt(
+ BookieProtocol.CURRENT_PROTOCOL_VERSION,
BookieProtocol.ADDENTRY, (short) flags));
+ buf.writeBytes(masterKey, 0, BookieProtocol.MASTER_KEY_LENGTH);
+
+ // The checksum is computed on the next part of the buffer only
+ buf.readerIndex(buf.writerIndex());
+ buf.writeLong(ledgerId);
+ buf.writeLong(entryId);
+ buf.writeLong(lastAddConfirmed);
+ buf.writeLong(length);
+
+ // Compute checksum over the headers
+ update(buf);
+
+ // don't unwrap slices
+ final ByteBuf unwrapped = data.unwrap() != null && data.unwrap()
instanceof CompositeByteBuf
+ ? data.unwrap() : data;
+ ReferenceCountUtil.retain(unwrapped);
+ ReferenceCountUtil.safeRelease(data);
+
+ if (unwrapped instanceof CompositeByteBuf) {
+ ((CompositeByteBuf) unwrapped).forEach(this::update);
} else {
- headersBuffer = Unpooled.buffer(METADATA_LENGTH + macCodeLength);
+ update(unwrapped);
}
+
+ populateValueAndReset(buf);
+
+ // Reset the reader index to the beginning
+ buf.readerIndex(0);
+
+ if (isSmallEntry) {
+ buf.writeBytes(data);
Review Comment:
buf.writeBytes(unwrap) ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]