This is an automated email from the ASF dual-hosted git repository.

heesung pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 140ca5e34ae [fix] [branch-3.0] Fix reader stuck when read from 
compacted topic with read compact mode disable (#22199)
140ca5e34ae is described below

commit 140ca5e34aeccf5657fb056d1fc6f6011ff6c3a7
Author: thetumbled <[email protected]>
AuthorDate: Thu Mar 7 05:37:31 2024 +0800

    [fix] [branch-3.0] Fix reader stuck when read from compacted topic with 
read compact mode disable (#22199)
---
 .../apache/pulsar/broker/service/ServerCnx.java    | 33 +++++++++++++++++-----
 .../compaction/GetLastMessageIdCompactedTest.java  | 27 ++++++++++++++++++
 2 files changed, 53 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 57081863a14..d40a4ec789f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -2077,7 +2077,8 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
                         (PositionImpl) markDeletePosition,
                         partitionIndex,
                         requestId,
-                        consumer.getSubscription().getName());
+                        consumer.getSubscription().getName(),
+                        consumer.readCompacted());
             }).exceptionally(e -> {
                 
writeAndFlush(Commands.newError(getLastMessageId.getRequestId(),
                         ServerError.UnknownError, "Failed to recover 
Transaction Buffer."));
@@ -2095,16 +2096,33 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
             PositionImpl markDeletePosition,
             int partitionIndex,
             long requestId,
-            String subscriptionName) {
-
+            String subscriptionName,
+            boolean readCompacted) {
         PersistentTopic persistentTopic = (PersistentTopic) topic;
         ManagedLedgerImpl ml = (ManagedLedgerImpl) 
persistentTopic.getManagedLedger();
 
         // If it's not pointing to a valid entry, respond messageId of the 
current position.
         // If the compaction cursor reach the end of the topic, respond 
messageId from compacted ledger
-        Optional<Position> compactionHorizon = 
persistentTopic.getCompactedTopic().getCompactionHorizon();
-        if (lastPosition.getEntryId() == -1 || (compactionHorizon.isPresent()
-                        && lastPosition.compareTo((PositionImpl) 
compactionHorizon.get()) <= 0)) {
+        Optional<Position> compactionHorizon = readCompacted
+                ? persistentTopic.getCompactedTopic().getCompactionHorizon() : 
Optional.empty();
+        if (lastPosition.getEntryId() == -1 || 
!ml.ledgerExists(lastPosition.getLedgerId())) {
+            // there is no entry in the original topic
+            if (compactionHorizon != null && compactionHorizon.isPresent()) {
+                // if readCompacted is true, we need to read the last entry 
from compacted topic
+                handleLastMessageIdFromCompactedLedger(persistentTopic, 
requestId, partitionIndex,
+                        markDeletePosition);
+                return;
+            } else {
+                // if readCompacted is false, we need to return 
MessageId.earliest
+                writeAndFlush(Commands.newGetLastMessageIdResponse(requestId, 
-1, -1, partitionIndex, -1,
+                        markDeletePosition != null ? 
markDeletePosition.getLedgerId() : -1,
+                        markDeletePosition != null ? 
markDeletePosition.getEntryId() : -1));
+            }
+            return;
+        }
+
+        if (compactionHorizon != null && compactionHorizon.isPresent()
+                        && lastPosition.compareTo((PositionImpl) 
compactionHorizon.get()) <= 0) {
             handleLastMessageIdFromCompactedLedger(persistentTopic, requestId, 
partitionIndex,
                     markDeletePosition);
             return;
@@ -2133,7 +2151,8 @@ public class ServerCnx extends PulsarHandler implements 
TransportCnx {
 
         batchSizeFuture.whenComplete((batchSize, e) -> {
             if (e != null) {
-                if (e.getCause() instanceof 
ManagedLedgerException.NonRecoverableLedgerException) {
+                if (e.getCause() instanceof 
ManagedLedgerException.NonRecoverableLedgerException
+                        && readCompacted) {
                     handleLastMessageIdFromCompactedLedger(persistentTopic, 
requestId, partitionIndex,
                             markDeletePosition);
                 } else {
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java
index 317b1a227e5..6c2d848bb7c 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/GetLastMessageIdCompactedTest.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.compaction;
 
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
@@ -32,6 +34,7 @@ import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.CompressionType;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerBuilder;
@@ -415,4 +418,28 @@ public class GetLastMessageIdCompactedTest extends 
ProducerConsumerBase {
         producer.close();
         admin.topics().delete(topicName, false);
     }
+
+    @Test(dataProvider = "enabledBatch")
+    public void testReaderStuckWithCompaction(boolean enabledBatch) throws 
Exception {
+        String topicName = "persistent://public/default/" + 
BrokerTestUtil.newUniqueName("tp");
+        String subName = "sub";
+        Producer<String> producer = createProducer(enabledBatch, topicName);
+        producer.newMessage().key("k0").value("v0").sendAsync();
+        producer.newMessage().key("k0").value("v1").sendAsync();
+        producer.flush();
+
+        triggerCompactionAndWait(topicName);
+        triggerLedgerSwitch(topicName);
+        clearAllTheLedgersOutdated(topicName);
+
+        var reader = pulsarClient.newReader(Schema.STRING)
+                .topic(topicName)
+                .subscriptionName(subName)
+                .startMessageId(MessageId.earliest)
+                .create();
+        while (reader.hasMessageAvailable()) {
+            Message<String> message = reader.readNext(5, TimeUnit.SECONDS);
+            assertNotEquals(message, null);
+        }
+    }
 }

Reply via email to