This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new 14824a5adbd [improve][broker]consumer backlog eviction policy should 
not reset read position for consumer (#18350)
14824a5adbd is described below

commit 14824a5adbd052f6293763a05f5f84021dfcf234
Author: Jiwei Guo <[email protected]>
AuthorDate: Mon Nov 7 10:26:11 2022 +0800

    [improve][broker]consumer backlog eviction policy should not reset read 
position for consumer (#18350)
---
 .../pulsar/broker/service/BacklogQuotaManager.java | 16 +++++---
 .../broker/service/BacklogQuotaManagerTest.java    | 47 +++++++++++++++-------
 2 files changed, 43 insertions(+), 20 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
index ee12c3ff743..607e7387fb3 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
@@ -239,22 +239,28 @@ public class BacklogQuotaManager {
             Long currentMillis = ((ManagedLedgerImpl) 
persistentTopic.getManagedLedger()).getClock().millis();
             ManagedLedgerImpl mLedger = (ManagedLedgerImpl) 
persistentTopic.getManagedLedger();
             try {
-                for (;;) {
+                for (; ; ) {
                     ManagedCursor slowestConsumer = 
mLedger.getSlowestConsumer();
                     Position oldestPosition = 
slowestConsumer.getMarkDeletedPosition();
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] slowest consumer mark delete position 
is [{}], read position is [{}]",
+                                slowestConsumer.getName(), oldestPosition, 
slowestConsumer.getReadPosition());
+                    }
                     ManagedLedgerInfo.LedgerInfo ledgerInfo = 
mLedger.getLedgerInfo(oldestPosition.getLedgerId()).get();
                     if (ledgerInfo == null) {
-                        
slowestConsumer.resetCursor(mLedger.getNextValidPosition((PositionImpl) 
oldestPosition));
+                        PositionImpl nextPosition =
+                                
PositionImpl.get(mLedger.getNextValidLedger(oldestPosition.getLedgerId()), -1);
+                        slowestConsumer.markDelete(nextPosition);
                         continue;
                     }
                     // Timestamp only > 0 if ledger has been closed
                     if (ledgerInfo.getTimestamp() > 0
                             && currentMillis - ledgerInfo.getTimestamp() > 
quota.getLimitTime()) {
                         // skip whole ledger for the slowest cursor
-                        PositionImpl nextPosition = 
mLedger.getNextValidPosition(
-                                PositionImpl.get(ledgerInfo.getLedgerId(), 
ledgerInfo.getEntries() - 1));
+                        PositionImpl nextPosition =
+                                
PositionImpl.get(mLedger.getNextValidLedger(ledgerInfo.getLedgerId()), -1);
                         if (!nextPosition.equals(oldestPosition)) {
-                            slowestConsumer.resetCursor(nextPosition);
+                            slowestConsumer.markDelete(nextPosition);
                             continue;
                         }
                     }
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index 21fcffac1ea..781a90a1c10 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -35,8 +35,11 @@ import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import lombok.Cleanup;
+import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -47,7 +50,6 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.ClusterData;
-import org.apache.pulsar.common.policies.data.ClusterDataImpl;
 import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.apache.pulsar.common.policies.data.TopicStats;
@@ -148,7 +150,7 @@ public class BacklogQuotaManagerTest {
     }
 
     /**
-     * Readers should not effect backlog quota
+     * Readers should not effect backlog quota.
      */
     @Test
     public void testBacklogQuotaWithReader() throws Exception {
@@ -160,11 +162,13 @@ public class BacklogQuotaManagerTest {
                         .limitTime(TIME_TO_CHECK_BACKLOG_QUOTA)
                         
.retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
                         .build());
-        try (PulsarClient client = 
PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, 
TimeUnit.SECONDS).build();) {
+        try (PulsarClient client = 
PulsarClient.builder().serviceUrl(adminUrl.toString())
+                .statsInterval(0, TimeUnit.SECONDS).build();) {
             final String topic1 = "persistent://prop/ns-quota/topic1";
             final int numMsgs = 20;
 
-            Reader<byte[]> reader = 
client.newReader().topic(topic1).receiverQueueSize(1).startMessageId(MessageId.latest).create();
+            Reader<byte[]> reader = 
client.newReader().topic(topic1).receiverQueueSize(1)
+                    .startMessageId(MessageId.latest).create();
 
             org.apache.pulsar.client.api.Producer<byte[]> producer = 
createProducer(client, topic1);
 
@@ -187,7 +191,7 @@ public class BacklogQuotaManagerTest {
             assertEquals(stats.getSubscriptions().size(), 1);
             long nonDurableSubscriptionBacklog = 
stats.getSubscriptions().values().iterator().next().getMsgBacklog();
             assertEquals(nonDurableSubscriptionBacklog, MAX_ENTRIES_PER_LEDGER,
-              "non-durable subscription backlog is [" + 
nonDurableSubscriptionBacklog + "]"); ;
+              "non-durable subscription backlog is [" + 
nonDurableSubscriptionBacklog + "]");
 
             try {
                 // try to send over backlog quota and make sure it fails
@@ -237,10 +241,12 @@ public class BacklogQuotaManagerTest {
                         .limitTime(TIME_TO_CHECK_BACKLOG_QUOTA)
                         
.retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
                         .build());
-        try (PulsarClient client = 
PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, 
TimeUnit.SECONDS).build();) {
+        try (PulsarClient client = 
PulsarClient.builder().serviceUrl(adminUrl.toString())
+                .statsInterval(0, TimeUnit.SECONDS).build();) {
             final String topic1 = "persistent://prop/ns-quota/topic1" + 
UUID.randomUUID();
             final int numMsgs = 20;
-            Reader<byte[]> reader = 
client.newReader().topic(topic1).receiverQueueSize(1).startMessageId(MessageId.latest).create();
+            Reader<byte[]> reader = 
client.newReader().topic(topic1).receiverQueueSize(1)
+                    .startMessageId(MessageId.latest).create();
             Producer<byte[]> producer = createProducer(client, topic1);
             byte[] content = new byte[1024];
             for (int i = 0; i < numMsgs; i++) {
@@ -257,7 +263,7 @@ public class BacklogQuotaManagerTest {
             assertEquals(stats.getSubscriptions().size(), 1);
             long nonDurableSubscriptionBacklog = 
stats.getSubscriptions().values().iterator().next().getMsgBacklog();
             assertEquals(nonDurableSubscriptionBacklog, MAX_ENTRIES_PER_LEDGER,
-              "non-durable subscription backlog is [" + 
nonDurableSubscriptionBacklog + "]"); ;
+              "non-durable subscription backlog is [" + 
nonDurableSubscriptionBacklog + "]");
             try {
                 // try to send over backlog quota and make sure it fails
                 for (int i = 0; i < numMsgs; i++) {
@@ -307,10 +313,12 @@ public class BacklogQuotaManagerTest {
                         .limitTime(TIME_TO_CHECK_BACKLOG_QUOTA)
                         
.retentionPolicy(BacklogQuota.RetentionPolicy.producer_exception)
                         .build());
-        try (PulsarClient client = 
PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, 
TimeUnit.SECONDS).build();) {
+        try (PulsarClient client = 
PulsarClient.builder().serviceUrl(adminUrl.toString())
+                .statsInterval(0, TimeUnit.SECONDS).build();) {
             final String topic1 = "persistent://prop/ns-quota/topic2" + 
UUID.randomUUID();
             final int numMsgs = 9;
-            Reader<byte[]> reader = 
client.newReader().topic(topic1).receiverQueueSize(1).startMessageId(MessageId.latest).create();
+            Reader<byte[]> reader = 
client.newReader().topic(topic1).receiverQueueSize(1)
+                    .startMessageId(MessageId.latest).create();
             Producer<byte[]> producer = createProducer(client, topic1);
             byte[] content = new byte[1024];
             for (int i = 0; i < numMsgs; i++) {
@@ -472,14 +480,22 @@ public class BacklogQuotaManagerTest {
         assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(), 
14);
         assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(), 
14);
 
+        PersistentTopic topic1Reference = (PersistentTopic) 
pulsar.getBrokerService().getTopicReference(topic1).get();
+        ManagedLedgerImpl ml = (ManagedLedgerImpl) 
topic1Reference.getManagedLedger();
+        Position slowConsumerReadPos = 
ml.getSlowestConsumer().getReadPosition();
+
         Thread.sleep((TIME_TO_CHECK_BACKLOG_QUOTA * 2) * 1000);
         rolloverStats();
 
-        stats = admin.topics().getStats(topic1);
+        TopicStats stats2 = admin.topics().getStats(topic1);
         // Messages on first 2 ledgers should be expired, backlog is number of
-        // message in current ledger which should be 4.
-        assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(), 
4);
-        assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(), 
4);
+        // message in current ledger.
+        Awaitility.await().untilAsserted(() -> {
+            
assertEquals(stats2.getSubscriptions().get(subName1).getMsgBacklog(), 
ml.getCurrentLedgerEntries());
+            
assertEquals(stats2.getSubscriptions().get(subName2).getMsgBacklog(), 
ml.getCurrentLedgerEntries());
+        });
+
+        assertEquals(ml.getSlowestConsumer().getReadPosition(), 
slowConsumerReadPos);
         client.close();
     }
 
@@ -1284,7 +1300,8 @@ public class BacklogQuotaManagerTest {
         pulsar.start();
 
         @Cleanup
-        PulsarClient client = 
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).statsInterval(0,
 TimeUnit.SECONDS)
+        PulsarClient client = 
PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl())
+                .statsInterval(0, TimeUnit.SECONDS)
                 .build();
 
         final String topic1 = "persistent://prop/ns-quota/topic2";

Reply via email to