lhotari commented on code in PR #25817:
URL: https://github.com/apache/pulsar/pull/25817#discussion_r3265915097
##########
pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java:
##########
@@ -462,14 +462,22 @@ private CompletableFuture<Void>
addToCompactedLedger(LedgerHandle lh, RawMessage
return bkf;
}
+ /**
+ * Extract the partition key and the payload size for a non-batch message.
+ *
+ * @return a pair of (partitionKey, payloadSize), or null if the message has
no partition key.
+ */
protected Pair<String, Integer> extractKeyAndSize(RawMessage m,
MessageMetadata msgMetadata) {
- ByteBuf headersAndPayload = m.getHeadersAndPayload();
if (msgMetadata.hasPartitionKey()) {
- int size = headersAndPayload.readableBytes();
+ int payloadSize;
if (msgMetadata.hasUncompressedSize()) {
- size = msgMetadata.getUncompressedSize();
+ payloadSize = msgMetadata.getUncompressedSize();
+ } else {
+ ByteBuf headersAndPayload = m.getHeadersAndPayload();
+ Commands.skipMessageMetadata(headersAndPayload);
+ payloadSize = headersAndPayload.readableBytes();
Review Comment:
It's better to call `ByteBuf.duplicate()` so that the original
headerAndPayload buffer's readIndex doesn't advance:
```suggestion
ByteBuf headersAndPayload = m.getHeadersAndPayload().duplicate();
Commands.skipMessageMetadata(headersAndPayload);
payloadSize = headersAndPayload.readableBytes();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]