This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 25c3ae3c575469e5ad115f71b54291a1b680dff5
Author: 萧易客 <[email protected]>
AuthorDate: Fri Dec 24 00:46:31 2021 +0800

    Fix dead loop in BacklogQuotaManager.dropBacklogForTimeLimit (#13194) 
(#13249)
    
    Fixes #13194
    
    ### Motivation
    
https://github.com/apache/pulsar/blob/38fb839154462fc5c6b0b4293f02762ed4021cd9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java#L200-L219
    BacklogQuotaManager.dropBacklogForTimeLimit may fall into dead loop in some 
conditions, e.g.
    `backlogQuotaDefaultLimitSecond` is enabled
    1. producer stop produce after produced some messages, current ledger is A
    2. times up, triggered ledger rollover, a new ledger B created which is 
empty (no entries)
    3. now lastConfirmedEntry is `A:last-entry-id`
    4. after `backlogQuotaDefaultLimitSecond` times up, it'll reset cursor to 
position `A:last-entry-id+1` which is only valid, so loop begin until the 
producer resume produce
    
    ### Modifications
    
    Record the previous slowestReaderPosition, if it is same with newer 
slowestReaderPosition after `resetCursor`, then exit loop.
    
    (cherry picked from commit 021409b6e1d3c910afe8e05d51a536cee647cb90)
---
 .../pulsar/broker/service/BacklogQuotaManager.java | 28 ++++++++-----
 .../broker/service/BacklogQuotaManagerTest.java    | 48 ++++++++++++++++++++++
 2 files changed, 65 insertions(+), 11 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
index c9cd15e..65ffaab 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
@@ -26,9 +26,10 @@ import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedCursor.IndividualDeletedEntries;
+import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
-import org.apache.bookkeeper.mledger.proto.MLDataFormats;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
@@ -241,17 +242,22 @@ public class BacklogQuotaManager {
             Long currentMillis = ((ManagedLedgerImpl) 
persistentTopic.getManagedLedger()).getClock().millis();
             ManagedLedgerImpl mLedger = (ManagedLedgerImpl) 
persistentTopic.getManagedLedger();
             try {
-                Long ledgerId =  
mLedger.getCursors().getSlowestReaderPosition().getLedgerId();
-                MLDataFormats.ManagedLedgerInfo.LedgerInfo  ledgerInfo = 
mLedger.getLedgerInfo(ledgerId).get();
-                // Timestamp only > 0 if ledger has been closed
-                while (ledgerInfo.getTimestamp() > 0
-                        && currentMillis - ledgerInfo.getTimestamp() > 
quota.getLimitTime()) {
+                for (;;) {
                     ManagedCursor slowestConsumer = 
mLedger.getSlowestConsumer();
-                    // skip whole ledger for the slowest cursor
-                    slowestConsumer.resetCursor(mLedger.getNextValidPosition(
-                            PositionImpl.get(ledgerInfo.getLedgerId(), 
ledgerInfo.getEntries() - 1)));
-                    ledgerId =  
mLedger.getCursors().getSlowestReaderPosition().getLedgerId();
-                    ledgerInfo = mLedger.getLedgerInfo(ledgerId).get();
+                    Position oldestPosition = 
slowestConsumer.getMarkDeletedPosition();
+                    ManagedLedgerInfo.LedgerInfo ledgerInfo = 
mLedger.getLedgerInfo(oldestPosition.getLedgerId()).get();
+                    // Timestamp only > 0 if ledger has been closed
+                    if (ledgerInfo.getTimestamp() > 0
+                            && currentMillis - ledgerInfo.getTimestamp() > 
quota.getLimitTime()) {
+                        // skip whole ledger for the slowest cursor
+                        PositionImpl nextPosition = 
mLedger.getNextValidPosition(
+                                PositionImpl.get(ledgerInfo.getLedgerId(), 
ledgerInfo.getEntries() - 1));
+                        if (!nextPosition.equals(oldestPosition)) {
+                            slowestConsumer.resetCursor(nextPosition);
+                            continue;
+                        }
+                    }
+                    break;
                 }
             } catch (Exception e) {
                 log.error("[{}] Error resetting cursor for slowest consumer 
[{}]", persistentTopic.getName(),
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index 560cd9e..c0fc189 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -477,6 +477,54 @@ public class BacklogQuotaManagerTest {
     }
 
     @Test
+    public void testConsumerBacklogEvictionTimeQuotaWithEmptyLedger() throws 
Exception {
+        assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),
+                Maps.newHashMap());
+        admin.namespaces().setBacklogQuota("prop/ns-quota",
+                BacklogQuota.builder()
+                        .limitTime(TIME_TO_CHECK_BACKLOG_QUOTA)
+                        
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
+                        .build(), BacklogQuota.BacklogQuotaType.message_age);
+        PulsarClient client = 
PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, 
TimeUnit.SECONDS)
+                .build();
+
+        final String topic = "persistent://prop/ns-quota/topic4";
+        final String subName = "c1";
+
+        Consumer<byte[]> consumer = 
client.newConsumer().topic(topic).subscriptionName(subName).subscribe();
+        org.apache.pulsar.client.api.Producer<byte[]> producer = 
createProducer(client, topic);
+        producer.send(new byte[1024]);
+        consumer.receive();
+
+        admin.topics().unload(topic);
+        PersistentTopicInternalStats internalStats = 
admin.topics().getInternalStats(topic);
+        assertEquals(internalStats.ledgers.size(), 2);
+        assertEquals(internalStats.ledgers.get(1).entries, 0);
+
+        TopicStats stats = admin.topics().getStats(topic);
+        assertEquals(stats.getSubscriptions().get(subName).getMsgBacklog(), 1);
+
+        TimeUnit.SECONDS.sleep(TIME_TO_CHECK_BACKLOG_QUOTA);
+
+        Awaitility.await()
+                .pollInterval(Duration.ofSeconds(1))
+                .atMost(Duration.ofSeconds(TIME_TO_CHECK_BACKLOG_QUOTA))
+                .untilAsserted(() -> {
+                    rolloverStats();
+
+                    // Cause the last ledger is empty, it is not possible to 
skip first ledger,
+                    // so the number of ledgers will keep unchanged, and 
backlog is clear
+                    PersistentTopicInternalStats latestInternalStats = 
admin.topics().getInternalStats(topic);
+                    assertEquals(latestInternalStats.ledgers.size(), 2);
+                    assertEquals(latestInternalStats.ledgers.get(1).entries, 
0);
+                    TopicStats latestStats = admin.topics().getStats(topic);
+                    
assertEquals(latestStats.getSubscriptions().get(subName).getMsgBacklog(), 0);
+                });
+
+        client.close();
+    }
+
+    @Test
     public void testConsumerBacklogEvictionWithAckSizeQuota() throws Exception 
{
         assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),
                 Maps.newHashMap());

Reply via email to