This is an automated email from the ASF dual-hosted git repository. penghui pushed a commit to branch branch-2.9 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit e4291454da5c291ea70f380008cddf4a3561b817 Author: 萧易客 <[email protected]> AuthorDate: Fri Dec 24 00:46:31 2021 +0800 Fix dead loop in BacklogQuotaManager.dropBacklogForTimeLimit (#13194) (#13249) Fixes #13194 ### Motivation https://github.com/apache/pulsar/blob/38fb839154462fc5c6b0b4293f02762ed4021cd9/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java#L200-L219 BacklogQuotaManager.dropBacklogForTimeLimit may fall into dead loop in some conditions, e.g. `backlogQuotaDefaultLimitSecond` is enabled 1. producer stop produce after produced some messages, current ledger is A 2. times up, triggered ledger rollover, a new ledger B created which is empty (no entries) 3. now lastConfirmedEntry is `A:last-entry-id` 4. after `backlogQuotaDefaultLimitSecond` times up, it'll reset cursor to position `A:last-entry-id+1` which is only valid, so loop begin until the producer resume produce ### Modifications Record the previous slowestReaderPosition, if it is same with newer slowestReaderPosition after `resetCursor`, then exit loop. (cherry picked from commit 021409b6e1d3c910afe8e05d51a536cee647cb90) --- .../pulsar/broker/service/BacklogQuotaManager.java | 28 ++++++++----- .../broker/service/BacklogQuotaManagerTest.java | 48 ++++++++++++++++++++++ 2 files changed, 65 insertions(+), 11 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java index 042f9ff..6efc1a4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java @@ -26,9 +26,10 @@ import java.util.concurrent.CompletableFuture; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedCursor.IndividualDeletedEntries; +import org.apache.bookkeeper.mledger.Position; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; -import org.apache.bookkeeper.mledger.proto.MLDataFormats; +import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.resources.NamespaceResources; import org.apache.pulsar.broker.service.persistent.PersistentTopic; @@ -238,17 +239,22 @@ public class BacklogQuotaManager { Long currentMillis = ((ManagedLedgerImpl) persistentTopic.getManagedLedger()).getClock().millis(); ManagedLedgerImpl mLedger = (ManagedLedgerImpl) persistentTopic.getManagedLedger(); try { - Long ledgerId = mLedger.getCursors().getSlowestReaderPosition().getLedgerId(); - MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = mLedger.getLedgerInfo(ledgerId).get(); - // Timestamp only > 0 if ledger has been closed - while (ledgerInfo.getTimestamp() > 0 - && currentMillis - ledgerInfo.getTimestamp() > quota.getLimitTime()) { + for (;;) { ManagedCursor slowestConsumer = mLedger.getSlowestConsumer(); - // skip whole ledger for the slowest cursor - slowestConsumer.resetCursor(mLedger.getNextValidPosition( - PositionImpl.get(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1))); - ledgerId = mLedger.getCursors().getSlowestReaderPosition().getLedgerId(); - ledgerInfo = mLedger.getLedgerInfo(ledgerId).get(); + Position oldestPosition = slowestConsumer.getMarkDeletedPosition(); + ManagedLedgerInfo.LedgerInfo ledgerInfo = mLedger.getLedgerInfo(oldestPosition.getLedgerId()).get(); + // Timestamp only > 0 if ledger has been closed + if (ledgerInfo.getTimestamp() > 0 + && currentMillis - ledgerInfo.getTimestamp() > quota.getLimitTime()) { + // skip whole ledger for the slowest cursor + PositionImpl nextPosition = mLedger.getNextValidPosition( + PositionImpl.get(ledgerInfo.getLedgerId(), ledgerInfo.getEntries() - 1)); + if (!nextPosition.equals(oldestPosition)) { + slowestConsumer.resetCursor(nextPosition); + continue; + } + } + break; } } catch (Exception e) { log.error("[{}] Error resetting cursor for slowest consumer [{}]", persistentTopic.getName(), diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java index 0dac0c2..f706b3f 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java @@ -484,6 +484,54 @@ public class BacklogQuotaManagerTest { } @Test + public void testConsumerBacklogEvictionTimeQuotaWithEmptyLedger() throws Exception { + assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), + Maps.newHashMap()); + admin.namespaces().setBacklogQuota("prop/ns-quota", + BacklogQuota.builder() + .limitTime(TIME_TO_CHECK_BACKLOG_QUOTA) + .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) + .build(), BacklogQuota.BacklogQuotaType.message_age); + PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS) + .build(); + + final String topic = "persistent://prop/ns-quota/topic4"; + final String subName = "c1"; + + Consumer<byte[]> consumer = client.newConsumer().topic(topic).subscriptionName(subName).subscribe(); + org.apache.pulsar.client.api.Producer<byte[]> producer = createProducer(client, topic); + producer.send(new byte[1024]); + consumer.receive(); + + admin.topics().unload(topic); + PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topic); + assertEquals(internalStats.ledgers.size(), 2); + assertEquals(internalStats.ledgers.get(1).entries, 0); + + TopicStats stats = admin.topics().getStats(topic); + assertEquals(stats.getSubscriptions().get(subName).getMsgBacklog(), 1); + + TimeUnit.SECONDS.sleep(TIME_TO_CHECK_BACKLOG_QUOTA); + + Awaitility.await() + .pollInterval(Duration.ofSeconds(1)) + .atMost(Duration.ofSeconds(TIME_TO_CHECK_BACKLOG_QUOTA)) + .untilAsserted(() -> { + rolloverStats(); + + // Cause the last ledger is empty, it is not possible to skip first ledger, + // so the number of ledgers will keep unchanged, and backlog is clear + PersistentTopicInternalStats latestInternalStats = admin.topics().getInternalStats(topic); + assertEquals(latestInternalStats.ledgers.size(), 2); + assertEquals(latestInternalStats.ledgers.get(1).entries, 0); + TopicStats latestStats = admin.topics().getStats(topic); + assertEquals(latestStats.getSubscriptions().get(subName).getMsgBacklog(), 0); + }); + + client.close(); + } + + @Test public void testConsumerBacklogEvictionWithAckSizeQuota() throws Exception { assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), Maps.newHashMap());
