This is an automated email from the ASF dual-hosted git repository. xiangying pushed a commit to branch branch-2.10 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 49e86d921c2ebbcaf0a93d3c11b2df666963c77d Author: Qiang Huang <[email protected]> AuthorDate: Mon Oct 10 11:04:08 2022 +0800 [fix][broker]unify time unit at dropping the backlog on a topic (#17957) (cherry picked from commit add77aa23a1cd8435b0a68687b461494ab1f6d26) --- .../pulsar/broker/service/BacklogQuotaManager.java | 2 +- .../broker/service/BacklogQuotaManagerTest.java | 54 ++++++++++++++++++++++ 2 files changed, 55 insertions(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java index 915f9e7c6b9..2744469ea8d 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java @@ -226,7 +226,7 @@ public class BacklogQuotaManager { } // Timestamp only > 0 if ledger has been closed if (ledgerInfo.getTimestamp() > 0 - && currentMillis - ledgerInfo.getTimestamp() > quota.getLimitTime()) { + && currentMillis - ledgerInfo.getTimestamp() > quota.getLimitTime() * 1000) { // skip whole ledger for the slowest cursor PositionImpl nextPosition = PositionImpl.get(mLedger.getNextValidLedger(ledgerInfo.getLedgerId()), -1); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java index 6519632f948..69c0b8dd00e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java @@ -521,6 +521,60 @@ public class BacklogQuotaManagerTest { client.close(); } + @Test(timeOut = 60000) + public void testConsumerBacklogEvictionTimeQuotaWithPartEviction() throws Exception { + assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"), + new HashMap<>()); + admin.namespaces().setBacklogQuota("prop/ns-quota", + BacklogQuota.builder() + .limitTime(5) // set limit time as 5 seconds + .retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction) + .build(), BacklogQuota.BacklogQuotaType.message_age); + PulsarClient client = PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0, TimeUnit.SECONDS) + .build(); + + final String topic1 = "persistent://prop/ns-quota/topic3" + UUID.randomUUID(); + final String subName1 = "c1"; + final String subName2 = "c2"; + int numMsgs = 5; + + Consumer<byte[]> consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe(); + Consumer<byte[]> consumer2 = client.newConsumer().topic(topic1).subscriptionName(subName2).subscribe(); + org.apache.pulsar.client.api.Producer<byte[]> producer = createProducer(client, topic1); + byte[] content = new byte[1024]; + for (int i = 0; i < numMsgs; i++) { + producer.send(content); + consumer1.receive(); + consumer2.receive(); + } + + TopicStats stats = getTopicStats(topic1); + assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(), 5); + assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(), 5); + + // Sleep 5000 mills for first 5 messages. + Thread.sleep(5000L); + numMsgs = 9; + for (int i = 0; i < numMsgs; i++) { + producer.send(content); + consumer1.receive(); + consumer2.receive(); + } + + // The first 5 messages are expired after sleeping 2000 more mills. + Thread.sleep(2000L); + rolloverStats(); + + TopicStats stats2 = getTopicStats(topic1); + // The first 5 messages should be expired due to limit time is 5 seconds, and the last 9 message should not. + Awaitility.await().untilAsserted(() -> { + assertEquals(stats2.getSubscriptions().get(subName1).getMsgBacklog(), 9); + assertEquals(stats2.getSubscriptions().get(subName2).getMsgBacklog(), 9); + }); + client.close(); + } + + @Test public void testConsumerBacklogEvictionTimeQuotaWithEmptyLedger() throws Exception { assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),
