lhotari commented on code in PR #25817:
URL: https://github.com/apache/pulsar/pull/25817#discussion_r3265915097


##########
pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java:
##########
@@ -462,14 +462,22 @@ private CompletableFuture<Void> 
addToCompactedLedger(LedgerHandle lh, RawMessage
     return bkf;
   }
 
+  /**
+   * Extract the partition key and the payload size for a non-batch message.
+   *
+   * @return a pair of (partitionKey, payloadSize), or null if the message has 
no partition key.
+   */
   protected Pair<String, Integer> extractKeyAndSize(RawMessage m, 
MessageMetadata msgMetadata) {
-    ByteBuf headersAndPayload = m.getHeadersAndPayload();
     if (msgMetadata.hasPartitionKey()) {
-      int size = headersAndPayload.readableBytes();
+      int payloadSize;
       if (msgMetadata.hasUncompressedSize()) {
-        size = msgMetadata.getUncompressedSize();
+        payloadSize = msgMetadata.getUncompressedSize();
+      } else {
+        ByteBuf headersAndPayload = m.getHeadersAndPayload();
+        Commands.skipMessageMetadata(headersAndPayload);
+        payloadSize = headersAndPayload.readableBytes();

Review Comment:
   It's better to call `ByteBuf.duplicate()` so that the original 
headerAndPayload buffer's readIndex doesn't advance:
   ```suggestion
           ByteBuf headersAndPayload = m.getHeadersAndPayload().duplicate();
           Commands.skipMessageMetadata(headersAndPayload);
           payloadSize = headersAndPayload.readableBytes();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to