Denovo1998 commented on code in PR #25817:
URL: https://github.com/apache/pulsar/pull/25817#discussion_r3303886313


##########
pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java:
##########
@@ -640,6 +640,80 @@ public void testBatchMessageWithNullValue() throws 
Exception {
         assertEquals(messages.get(2).getKey(), "key5");
     }
 
+    /**
+     * Write raw non-batch entries directly to the managed ledger, simulating
+     * messages from C++/Python clients that do not set numMessagesInBatch.
+     * Verifies that null-value tombstones remove keys during compaction.

Review Comment:
   Small wording nit: this comment says the raw entries simulate C++/Python 
clients because they do not set `numMessagesInBatch`. The Java non-batch send 
path also does not set `MessageMetadata.numMessagesInBatch`; Java avoids the 
old bug mainly because `ProducerImpl#updateMessageMetadata` sets 
`uncompressedSize=0` for null payloads.
   
   Could we reword this as "raw non-batch entries without `uncompressedSize`, 
as seen with some non-Java clients" so future readers don't infer that 
`numMessagesInBatch` is what protects Java non-batch messages?



##########
pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java:
##########
@@ -462,14 +462,22 @@ private CompletableFuture<Void> 
addToCompactedLedger(LedgerHandle lh, RawMessage
     return bkf;
   }
 
+  /**
+   * Extract the partition key and the payload size for a non-batch message.
+   *
+   * @return a pair of (partitionKey, payloadSize), or null if the message has 
no partition key.
+   */
   protected Pair<String, Integer> extractKeyAndSize(RawMessage m, 
MessageMetadata msgMetadata) {
-    ByteBuf headersAndPayload = m.getHeadersAndPayload();
     if (msgMetadata.hasPartitionKey()) {
-      int size = headersAndPayload.readableBytes();
+      int payloadSize;
       if (msgMetadata.hasUncompressedSize()) {
-        size = msgMetadata.getUncompressedSize();
+        payloadSize = msgMetadata.getUncompressedSize();
+      } else {
+        ByteBuf headersAndPayload = m.getHeadersAndPayload().duplicate();
+        Commands.skipMessageMetadata(headersAndPayload);
+        payloadSize = headersAndPayload.readableBytes();

Review Comment:
   Should we check `msgMetadata.hasNullValue() && msgMetadata.isNullValue()` 
before deriving the size from payload bytes?
   
   The new payload-only calculation fixes plain non-batch tombstones, but it 
can still keep encrypted tombstones when the producer does not set 
`uncompressedSize`. Encryption can turn an empty/null payload into non-empty 
ciphertext, so after `skipMessageMetadata(...)`, `readableBytes()` may still be 
> 0 and phase one will put the tombstone into `latestForKey` instead of 
removing the key.
   
   Java producers usually avoid this because 
`ProducerImpl#updateMessageMetadata` sets `uncompressedSize=0` for null 
payloads, but this PR is specifically fixing producers whose metadata does not 
include that size. The explicit `nullValue` flag is the protocol-level 
tombstone signal, so I think this helper should return size 0 for 
`nullValue=true` before falling back to `uncompressedSize` or payload bytes.
   
   ```java
   if (msgMetadata.hasNullValue() && msgMetadata.isNullValue()) {
       payloadSize = 0;
   } else if (msgMetadata.hasUncompressedSize()) {
       payloadSize = msgMetadata.getUncompressedSize();
   } else {
       ByteBuf headersAndPayload = m.getHeadersAndPayload().duplicate();
       Commands.skipMessageMetadata(headersAndPayload);
       payloadSize = headersAndPayload.readableBytes();
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to