This is an automated email from the ASF dual-hosted git repository.

huangqiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7404e0d77d9 [improve][broker]consumer backlog eviction policy should 
not reset read position for consumer (#18037)
7404e0d77d9 is described below

commit 7404e0d77d98c2ec4fab9a111b8ced38d33dcd5a
Author: Qiang Huang <[email protected]>
AuthorDate: Wed Oct 19 17:11:22 2022 +0800

    [improve][broker]consumer backlog eviction policy should not reset read 
position for consumer (#18037)
    
    ### Motivation
    Fixes #18036
    
    ### Modifications
    - The backlog eviction policy should use `asyncMarkDelete` instead of 
`resetCursor` in order to move the mark delete position.
---
 .../pulsar/broker/service/BacklogQuotaManager.java       | 16 +++++++++++-----
 .../pulsar/broker/service/BacklogQuotaManagerTest.java   |  9 +++++++--
 2 files changed, 18 insertions(+), 7 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
index 93ae777a89e..210c6f8767a 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
@@ -210,22 +210,28 @@ public class BacklogQuotaManager {
             Long currentMillis = ((ManagedLedgerImpl) 
persistentTopic.getManagedLedger()).getClock().millis();
             ManagedLedgerImpl mLedger = (ManagedLedgerImpl) 
persistentTopic.getManagedLedger();
             try {
-                for (;;) {
+                for (; ; ) {
                     ManagedCursor slowestConsumer = 
mLedger.getSlowestConsumer();
                     Position oldestPosition = 
slowestConsumer.getMarkDeletedPosition();
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] slowest consumer mark delete position 
is [{}], read position is [{}]",
+                                slowestConsumer.getName(), oldestPosition, 
slowestConsumer.getReadPosition());
+                    }
                     ManagedLedgerInfo.LedgerInfo ledgerInfo = 
mLedger.getLedgerInfo(oldestPosition.getLedgerId()).get();
                     if (ledgerInfo == null) {
-                        
slowestConsumer.resetCursor(mLedger.getNextValidPosition((PositionImpl) 
oldestPosition));
+                        PositionImpl nextPosition =
+                                
PositionImpl.get(mLedger.getNextValidLedger(oldestPosition.getLedgerId()), -1);
+                        slowestConsumer.markDelete(nextPosition);
                         continue;
                     }
                     // Timestamp only > 0 if ledger has been closed
                     if (ledgerInfo.getTimestamp() > 0
                             && currentMillis - ledgerInfo.getTimestamp() > 
quota.getLimitTime() * 1000) {
                         // skip whole ledger for the slowest cursor
-                        PositionImpl nextPosition = 
mLedger.getNextValidPosition(
-                                PositionImpl.get(ledgerInfo.getLedgerId(), 
ledgerInfo.getEntries() - 1));
+                        PositionImpl nextPosition =
+                                
PositionImpl.get(mLedger.getNextValidLedger(ledgerInfo.getLedgerId()), -1);
                         if (!nextPosition.equals(oldestPosition)) {
-                            slowestConsumer.resetCursor(nextPosition);
+                            slowestConsumer.markDelete(nextPosition);
                             continue;
                         }
                     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index 9eb6281eddc..97d1798c1e2 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -36,6 +36,7 @@ import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import lombok.Cleanup;
+import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
@@ -529,18 +530,22 @@ public class BacklogQuotaManagerTest {
         assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(), 
14);
         assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(), 
14);
 
+        PersistentTopic topic1Reference = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topic1).get();
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) 
topic1Reference.getManagedLedger();
+        Position slowConsumerReadPos = 
ml.getSlowestConsumer().getReadPosition();
+
         Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA * 2) * 1000);
         rolloverStats();
 
         TopicStats stats2 = getTopicStats(topic1);
-        PersistentTopic topic1Reference = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topic1).get();
-        ManagedLedgerImpl ml = (ManagedLedgerImpl) 
topic1Reference.getManagedLedger();
         // Messages on first 2 ledgers should be expired, backlog is number of
         // message in current ledger.
         Awaitility.await().untilAsserted(() -> {
             
assertEquals(stats2.getSubscriptions().get(subName1).getMsgBacklog(), 
ml.getCurrentLedgerEntries());
             
assertEquals(stats2.getSubscriptions().get(subName2).getMsgBacklog(), 
ml.getCurrentLedgerEntries());
         });
+
+        assertEquals(ml.getSlowestConsumer().getReadPosition(), 
slowConsumerReadPos);
         client.close();
     }
 

Reply via email to