This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.2 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 3629ba3ca2dab191cfd4fbbd2bbc8fe6e13496f0 Author: grishaf <[email protected]> AuthorDate: Fri May 29 10:13:15 2026 +0300 [fix][broker] Fix non-batched null-value messages not removed during topic compaction (#25817) (cherry picked from commit 1fa9e3532b4c8978b25f16b43855948b54e95d17) --- .../compaction/AbstractTwoPhaseCompactor.java | 20 ++++-- .../pulsar/compaction/EventTimeOrderCompactor.java | 14 ++-- .../apache/pulsar/compaction/CompactionTest.java | 74 ++++++++++++++++++++++ 3 files changed, 93 insertions(+), 15 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java index bdcdb50eb5f..095c4861ffe 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/AbstractTwoPhaseCompactor.java @@ -449,14 +449,24 @@ public abstract class AbstractTwoPhaseCompactor<T> extends Compactor { return bkf; } + /** + * Extract the partition key and the payload size for a non-batch message. + * + * @return a pair of (partitionKey, payloadSize), or null if the message has no partition key. + */ protected Pair<String, Integer> extractKeyAndSize(RawMessage m, MessageMetadata msgMetadata) { - ByteBuf headersAndPayload = m.getHeadersAndPayload(); if (msgMetadata.hasPartitionKey()) { - int size = headersAndPayload.readableBytes(); - if (msgMetadata.hasUncompressedSize()) { - size = msgMetadata.getUncompressedSize(); + int payloadSize; + if (msgMetadata.hasNullValue() && msgMetadata.isNullValue()) { + payloadSize = 0; + } else if (msgMetadata.hasUncompressedSize()) { + payloadSize = msgMetadata.getUncompressedSize(); + } else { + ByteBuf headersAndPayload = m.getHeadersAndPayload().duplicate(); + Commands.skipMessageMetadata(headersAndPayload); + payloadSize = headersAndPayload.readableBytes(); } - return Pair.of(msgMetadata.getPartitionKey(), size); + return Pair.of(msgMetadata.getPartitionKey(), payloadSize); } else { return null; } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/EventTimeOrderCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/EventTimeOrderCompactor.java index db129b54533..ad6b5f28d30 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/EventTimeOrderCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/EventTimeOrderCompactor.java @@ -18,7 +18,6 @@ */ package org.apache.pulsar.compaction; -import io.netty.buffer.ByteBuf; import java.io.IOException; import java.util.List; import java.util.Map; @@ -139,17 +138,12 @@ public class EventTimeOrderCompactor extends AbstractTwoPhaseCompactor<Pair<Mess } protected MessageCompactionData extractMessageCompactionData(RawMessage m, MessageMetadata metadata) { - ByteBuf headersAndPayload = m.getHeadersAndPayload(); - if (metadata.hasPartitionKey()) { - int size = headersAndPayload.readableBytes(); - if (metadata.hasUncompressedSize()) { - size = metadata.getUncompressedSize(); - } - return new MessageCompactionData(m.getMessageId(), metadata.getPartitionKey(), - size, metadata.getEventTime()); - } else { + Pair<String, Integer> keyAndSize = extractKeyAndSize(m, metadata); + if (keyAndSize == null) { return null; } + return new MessageCompactionData(m.getMessageId(), keyAndSize.getLeft(), + keyAndSize.getRight(), metadata.getEventTime()); } private List<MessageCompactionData> extractMessageCompactionDataFromBatch(RawMessage msg, MessageMetadata metadata) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java index 5fc6416ee98..e5ccec35e9f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java @@ -640,6 +640,80 @@ public class CompactionTest extends MockedPulsarServiceBaseTest { assertEquals(messages.get(2).getKey(), "key5"); } + /** + * Write raw non-batch entries directly to the managed ledger without + * uncompressedSize, as seen with some non-Java clients. Verifies that + * null-value tombstones remove keys during compaction. + */ + @Test + public void testNonBatchedMessageWithNullValue() throws Exception { + String topic = "persistent://my-tenant/my-ns/non-batched-message-with-null-value"; + + admin.topics().createNonPartitionedTopic(topic); + pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") + .receiverQueueSize(1).readCompacted(true).subscribe().close(); + + PersistentTopic persistentTopic = + (PersistentTopic) pulsar.getBrokerService().getTopic(topic, false).join().get(); + ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger(); + + long seqId = 0; + + // key1: value then null-value tombstone + ml.addEntry(buildNonBatchEntry("key1", "my-message-1".getBytes(), seqId++)); + ml.addEntry(buildNonBatchEntry("key1", null, seqId++)); + + // key2: value only (should survive) + ml.addEntry(buildNonBatchEntry("key2", "my-message-3".getBytes(), seqId++)); + + // key3: value then null-value tombstone + ml.addEntry(buildNonBatchEntry("key3", "my-message-4".getBytes(), seqId++)); + ml.addEntry(buildNonBatchEntry("key3", null, seqId++)); + + // key4: value only (should survive) + ml.addEntry(buildNonBatchEntry("key4", "my-message-6".getBytes(), seqId++)); + + compact(topic); + + List<Message<byte[]>> messages = new ArrayList<>(); + try (Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic) + .subscriptionName("sub1").receiverQueueSize(1).readCompacted(true).subscribe()) { + while (true) { + Message<byte[]> message = consumer.receive(5, TimeUnit.SECONDS); + if (message == null) { + break; + } + messages.add(message); + } + } + + assertEquals(messages.size(), 2); + assertEquals(messages.get(0).getKey(), "key2"); + assertEquals(messages.get(1).getKey(), "key4"); + } + + private byte[] buildNonBatchEntry(String key, byte[] payload, long sequenceId) { + org.apache.pulsar.common.api.proto.MessageMetadata metadata = + new org.apache.pulsar.common.api.proto.MessageMetadata(); + metadata.setPartitionKey(key); + metadata.setPublishTime(System.currentTimeMillis()); + metadata.setProducerName("test-non-batch"); + metadata.setSequenceId(sequenceId); + if (payload == null) { + metadata.setNullValue(true); + } + ByteBuf payloadBuf = io.netty.buffer.Unpooled.wrappedBuffer( + payload != null ? payload : new byte[0]); + ByteBuf entry = org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload( + org.apache.pulsar.common.protocol.Commands.ChecksumType.Crc32c, + metadata, payloadBuf); + byte[] bytes = new byte[entry.readableBytes()]; + entry.readBytes(bytes); + entry.release(); + payloadBuf.release(); + return bytes; + } + @Test public void testWholeBatchCompactedOut() throws Exception { String topic = "persistent://my-tenant/my-ns/whole-batch-compacted-out";
