This is an automated email from the ASF dual-hosted git repository.

zhaocong pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new fb60e9189f0 [fix][broker] Fix message loss during topic compaction 
(#20980)
fb60e9189f0 is described below

commit fb60e9189f06af75facca2fb7eab7ffe05109a77
Author: Cong Zhao <[email protected]>
AuthorDate: Mon Aug 14 20:33:29 2023 +0800

    [fix][broker] Fix message loss during topic compaction (#20980)
    
    (cherry picked from commit 3ab420cd81c31ebd16213e14580d9e317bc0698d)
---
 .../pulsar/client/impl/RawBatchConverter.java      |  8 +++-
 .../pulsar/compaction/TwoPhaseCompactor.java       | 18 ++++---
 .../apache/pulsar/compaction/CompactionTest.java   | 56 ++++++++++++++++++++++
 3 files changed, 74 insertions(+), 8 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
index 4d718f71a2e..0436ebdeed4 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
@@ -42,6 +42,10 @@ public class RawBatchConverter {
     public static boolean isReadableBatch(RawMessage msg) {
         ByteBuf payload = msg.getHeadersAndPayload();
         MessageMetadata metadata = Commands.parseMessageMetadata(payload);
+        return isReadableBatch(metadata);
+    }
+
+    public static boolean isReadableBatch(MessageMetadata metadata) {
         return metadata.hasNumMessagesInBatch() && 
metadata.getEncryptionKeysCount() == 0;
     }
 
@@ -69,9 +73,9 @@ public class RawBatchConverter {
                                                   
msg.getMessageIdData().getEntryId(),
                                                   
msg.getMessageIdData().getPartition(),
                                                   i);
-            if (!smm.isCompactedOut()) {
+            if (!smm.isCompactedOut() && smm.hasPartitionKey()) {
                 idsAndKeysAndSize.add(ImmutableTriple.of(id,
-                        smm.hasPartitionKey() ? smm.getPartitionKey() : null,
+                        smm.getPartitionKey(),
                         smm.hasPayloadSize() ? smm.getPayloadSize() : 0));
             }
             singleMessagePayload.release();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
index 821dd9c0c9d..a8c4cbe1e4d 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/TwoPhaseCompactor.java
@@ -126,22 +126,28 @@ public class TwoPhaseCompactor extends Compactor {
                 boolean deletedMessage = false;
                 boolean replaceMessage = false;
                 mxBean.addCompactionReadOp(reader.getTopic(), 
m.getHeadersAndPayload().readableBytes());
-                if (RawBatchConverter.isReadableBatch(m)) {
+                MessageMetadata metadata = 
Commands.parseMessageMetadata(m.getHeadersAndPayload());
+                if (RawBatchConverter.isReadableBatch(metadata)) {
                     try {
+                        int numMessagesInBatch = 
metadata.getNumMessagesInBatch();
+                        int deleteCnt = 0;
                         for (ImmutableTriple<MessageId, String, Integer> e : 
RawBatchConverter
                                 .extractIdsAndKeysAndSize(m)) {
                             if (e != null) {
                                 if (e.getRight() > 0) {
                                     MessageId old = 
latestForKey.put(e.getMiddle(), e.getLeft());
-                                    replaceMessage = old != null;
+                                    if (old != null) {
+                                        
mxBean.addCompactionRemovedEvent(reader.getTopic());
+                                    }
                                 } else {
-                                    deletedMessage = true;
                                     latestForKey.remove(e.getMiddle());
+                                    deleteCnt++;
+                                    
mxBean.addCompactionRemovedEvent(reader.getTopic());
                                 }
                             }
-                            if (replaceMessage || deletedMessage) {
-                                
mxBean.addCompactionRemovedEvent(reader.getTopic());
-                            }
+                        }
+                        if (deleteCnt == numMessagesInBatch) {
+                            deletedMessage = true;
                         }
                     } catch (IOException ioe) {
                         log.info("Error decoding batch for message {}. Whole 
batch will be included in output",
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
index c8105b01125..c5dbd9c49aa 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactionTest.java
@@ -48,6 +48,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.api.OpenBuilder;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
@@ -96,6 +97,7 @@ import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker-impl")
+@Slf4j
 public class CompactionTest extends MockedPulsarServiceBaseTest {
     protected ScheduledExecutorService compactionScheduler;
     protected BookKeeper bk;
@@ -553,6 +555,60 @@ public class CompactionTest extends 
MockedPulsarServiceBaseTest {
         }
     }
 
+    @Test
+    public void testBatchMessageWithNullValue() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/my-topic1";
+
+        pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
+                .receiverQueueSize(1).readCompacted(true).subscribe().close();
+
+        try (Producer<byte[]> producer = 
pulsarClient.newProducer().topic(topic)
+            .maxPendingMessages(3)
+            .enableBatching(true)
+            .batchingMaxMessages(3)
+            .batchingMaxPublishDelay(1, TimeUnit.HOURS)
+            .messageRoutingMode(MessageRoutingMode.SinglePartition)
+            .create()
+        ) {
+            // batch 1
+            
producer.newMessage().key("key1").value("my-message-1".getBytes()).sendAsync();
+            producer.newMessage().key("key1").value(null).sendAsync();
+            
producer.newMessage().key("key2").value("my-message-3".getBytes()).send();
+
+            // batch 2
+            
producer.newMessage().key("key3").value("my-message-4".getBytes()).sendAsync();
+            
producer.newMessage().key("key3").value("my-message-5".getBytes()).sendAsync();
+            
producer.newMessage().key("key3").value("my-message-6".getBytes()).send();
+
+            // batch 3
+            
producer.newMessage().key("key4").value("my-message-7".getBytes()).sendAsync();
+            producer.newMessage().key("key4").value(null).sendAsync();
+            
producer.newMessage().key("key5").value("my-message-9".getBytes()).send();
+        }
+
+
+        // compact the topic
+        compact(topic);
+
+        // Read messages before compaction to get ids
+        List<Message<byte[]>> messages = new ArrayList<>();
+        try (Consumer<byte[]> consumer = 
pulsarClient.newConsumer().topic(topic)
+             
.subscriptionName("sub1").receiverQueueSize(1).readCompacted(true).subscribe()) 
{
+            while (true) {
+                Message<byte[]> message = consumer.receive(5, 
TimeUnit.SECONDS);
+                if (message ==  null) {
+                    break;
+                }
+                messages.add(message);
+            }
+        }
+
+        assertEquals(messages.size(), 3);
+        assertEquals(messages.get(0).getKey(), "key2");
+        assertEquals(messages.get(1).getKey(), "key3");
+        assertEquals(messages.get(2).getKey(), "key5");
+    }
+
     @Test
     public void testWholeBatchCompactedOut() throws Exception {
         String topic = "persistent://my-property/use/my-ns/my-topic1";

Reply via email to