This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new dd88f536645 [fix] [broker] Fix write all compacted out entry into 
compacted topic (#21917)
dd88f536645 is described below

commit dd88f536645b04ded655726e98634336e7930edc
Author: thetumbled <[email protected]>
AuthorDate: Sun Jan 21 19:47:40 2024 +0800

    [fix] [broker] Fix write all compacted out entry into compacted topic 
(#21917)
---
 .../pulsar/client/impl/RawBatchConverter.java      |  6 ++-
 .../apache/pulsar/compaction/CompactorTest.java    | 56 ++++++++++++++++++++++
 2 files changed, 61 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
index b3c9d7c9f2b..94fbf3365b2 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawBatchConverter.java
@@ -130,7 +130,11 @@ public class RawBatchConverter {
                                                       
msg.getMessageIdData().getEntryId(),
                                                       
msg.getMessageIdData().getPartition(),
                                                       i);
-                if (!singleMessageMetadata.hasPartitionKey()) {
+                if (singleMessageMetadata.isCompactedOut()) {
+                    // we may read compacted out message from the compacted 
topic
+                    
Commands.serializeSingleMessageInBatchWithPayload(emptyMetadata,
+                            Unpooled.EMPTY_BUFFER, batchBuffer);
+                } else if (!singleMessageMetadata.hasPartitionKey()) {
                     if (retainNullKey) {
                         messagesRetained++;
                         
Commands.serializeSingleMessageInBatchWithPayload(singleMessageMetadata,
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
index e3a788f36c1..ab7878a8b7d 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactorTest.java
@@ -35,6 +35,8 @@ import java.util.Random;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
 import lombok.Cleanup;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.LedgerEntry;
@@ -45,10 +47,15 @@ import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
+import org.apache.pulsar.client.admin.LongRunningProcessStatus;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.MessageRoutingMode;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.RawMessage;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.ConnectionPool;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.RawMessageImpl;
@@ -160,6 +167,55 @@ public class CompactorTest extends 
MockedPulsarServiceBaseTest {
         compactAndVerify(topic, expected, true);
     }
 
+    @Test
+    public void testAllCompactedOut() throws Exception {
+        String topicName = 
"persistent://my-property/use/my-ns/testAllCompactedOut";
+        // set retain null key to true
+        boolean oldRetainNullKey = 
pulsar.getConfig().isTopicCompactionRetainNullKey();
+        pulsar.getConfig().setTopicCompactionRetainNullKey(true);
+        this.restartBroker();
+
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                
.enableBatching(true).topic(topicName).batchingMaxMessages(3).create();
+
+        producer.newMessage().key("K1").value("V1").sendAsync();
+        producer.newMessage().key("K2").value("V2").sendAsync();
+        producer.newMessage().key("K2").value(null).sendAsync();
+        producer.flush();
+
+        admin.topics().triggerCompaction(topicName);
+
+        Awaitility.await().untilAsserted(() -> {
+            
Assert.assertEquals(admin.topics().compactionStatus(topicName).status,
+                    LongRunningProcessStatus.Status.SUCCESS);
+        });
+
+        producer.newMessage().key("K1").value(null).sendAsync();
+        producer.flush();
+
+        admin.topics().triggerCompaction(topicName);
+
+        Awaitility.await().untilAsserted(() -> {
+            
Assert.assertEquals(admin.topics().compactionStatus(topicName).status,
+                    LongRunningProcessStatus.Status.SUCCESS);
+        });
+
+        @Cleanup
+        Reader<String> reader = pulsarClient.newReader(Schema.STRING)
+                .subscriptionName("reader-test")
+                .topic(topicName)
+                .readCompacted(true)
+                .startMessageId(MessageId.earliest)
+                .create();
+        while (reader.hasMessageAvailable()) {
+            Message<String> message = reader.readNext(3, TimeUnit.SECONDS);
+            Assert.assertNotNull(message);
+        }
+        // set retain null key back to avoid affecting other tests
+        pulsar.getConfig().setTopicCompactionRetainNullKey(oldRetainNullKey);
+    }
+
     @Test
     public void testCompactAddCompact() throws Exception {
         String topic = "persistent://my-property/use/my-ns/my-topic1";

Reply via email to