This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c710c187d266f0968ad48601636c653912ad5b49
Author: hangc0276 <[email protected]>
AuthorDate: Fri Jul 2 21:34:10 2021 +0800

    fix compaction entry read exception (#11175)
    
    
    (cherry picked from commit c716495a4282093bd81f9cd43f909811213bfb32)
---
 .../pulsar/compaction/CompactedTopicImpl.java      |  1 +
 .../pulsar/compaction/CompactedTopicTest.java      | 57 +++++++++++++++++++++-
 2 files changed, 57 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index e0755a7..12748e8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -109,6 +109,7 @@ public class CompactedTopicImpl implements CompactedTopic {
                                 if (startPoint == NEWER_THAN_COMPACTED) {
                                     cursor.seek(compactionHorizon.getNext());
                                     
callback.readEntriesComplete(Collections.emptyList(), ctx);
+                                    return 
CompletableFuture.completedFuture(null);
                                 }
                                 return readEntries(context.ledger, startPoint, 
endPoint)
                                     .thenAccept((entries) -> {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
index 2d3411b..410d1e5 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -46,14 +46,16 @@ import org.apache.commons.lang3.tuple.Triple;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
 import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
 import org.apache.pulsar.client.api.RawMessage;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.RawMessageImpl;
 import org.apache.pulsar.common.api.proto.MessageIdData;
 import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.testng.Assert;
@@ -305,4 +307,57 @@ public class CompactedTopicTest extends 
MockedPulsarServiceBaseTest {
 
         producer.close();
     }
+
+    @Test(timeOut = 30000)
+    public void testReadMessageFromCompactedLedger() throws Exception {
+        final String key = "1";
+        String msg = "test compaction msg";
+        final String topic = 
"persistent://my-property/use/my-ns/testCompactWithEmptyMessage-" + 
UUID.randomUUID();
+        admin.topics().createPartitionedTopic(topic, 1);
+        final int numMessages = 10;
+
+        Producer<String> producer = 
pulsarClient.newProducer(Schema.STRING).topic(topic).enableBatching(false).create();
+        for (int i = 0; i < numMessages; ++i) {
+            producer.newMessage().key(key).value(msg).send();
+        }
+
+        admin.topics().triggerCompaction(topic);
+        boolean succeed = retryStrategically((test) -> {
+            try {
+                return 
LongRunningProcessStatus.Status.SUCCESS.equals(admin.topics().compactionStatus(topic).status);
+            } catch (PulsarAdminException e) {
+                return false;
+            }
+        }, 10, 200);
+
+        Assert.assertTrue(succeed);
+
+
+        final String newKey = "2";
+        String newMsg = "test compaction msg v2";
+        for (int i = 0; i < numMessages; ++i) {
+            producer.newMessage().key(newKey).value(newMsg).send();
+        }
+
+        Reader<String> reader = pulsarClient.newReader(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("test")
+                .readCompacted(true)
+                .startMessageId(MessageId.earliest)
+                .create();
+
+        int compactedMsgCount = 0;
+        int nonCompactedMsgCount = 0;
+        while (reader.hasMessageAvailable()) {
+            Message<String> message = reader.readNext();
+            if (key.equals(message.getKey()) && 
msg.equals(message.getValue())) {
+                compactedMsgCount++;
+            } else if (newKey.equals(message.getKey()) && 
newMsg.equals(message.getValue())) {
+                nonCompactedMsgCount++;
+            }
+        }
+
+        Assert.assertEquals(compactedMsgCount, 1);
+        Assert.assertEquals(nonCompactedMsgCount, numMessages);
+    }
 }

Reply via email to