This is an automated email from the ASF dual-hosted git repository.
bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.9 by this push:
new d6c85667e8e [fix][broker]unify time unit at dropping the backlog on a
topic (#17957)
d6c85667e8e is described below
commit d6c85667e8e46cd993801f2597fa26563548e51b
Author: Qiang Huang <[email protected]>
AuthorDate: Mon Oct 10 11:04:08 2022 +0800
[fix][broker]unify time unit at dropping the backlog on a topic (#17957)
(cherry picked from commit add77aa23a1cd8435b0a68687b461494ab1f6d26)
---
.../pulsar/broker/service/BacklogQuotaManager.java | 2 +-
.../broker/service/BacklogQuotaManagerTest.java | 54 ++++++++++++++++++++++
2 files changed, 55 insertions(+), 1 deletion(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
index 607e7387fb3..aa8eb981d44 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BacklogQuotaManager.java
@@ -255,7 +255,7 @@ public class BacklogQuotaManager {
}
// Timestamp only > 0 if ledger has been closed
if (ledgerInfo.getTimestamp() > 0
- && currentMillis - ledgerInfo.getTimestamp() >
quota.getLimitTime()) {
+ && currentMillis - ledgerInfo.getTimestamp() >
quota.getLimitTime() * 1000) {
// skip whole ledger for the slowest cursor
PositionImpl nextPosition =
PositionImpl.get(mLedger.getNextValidLedger(ledgerInfo.getLedgerId()), -1);
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
index 68139b99dcb..3d549cc712a 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java
@@ -522,6 +522,60 @@ public class BacklogQuotaManagerTest {
client.close();
}
+ @Test(timeOut = 60000)
+ public void testConsumerBacklogEvictionTimeQuotaWithPartEviction() throws
Exception {
+ assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),
+ new HashMap<>());
+ admin.namespaces().setBacklogQuota("prop/ns-quota",
+ BacklogQuota.builder()
+ .limitTime(5) // set limit time as 5 seconds
+
.retentionPolicy(BacklogQuota.RetentionPolicy.consumer_backlog_eviction)
+ .build(), BacklogQuota.BacklogQuotaType.message_age);
+ PulsarClient client =
PulsarClient.builder().serviceUrl(adminUrl.toString()).statsInterval(0,
TimeUnit.SECONDS)
+ .build();
+
+ final String topic1 = "persistent://prop/ns-quota/topic3" +
UUID.randomUUID();
+ final String subName1 = "c1";
+ final String subName2 = "c2";
+ int numMsgs = 5;
+
+ Consumer<byte[]> consumer1 =
client.newConsumer().topic(topic1).subscriptionName(subName1).subscribe();
+ Consumer<byte[]> consumer2 =
client.newConsumer().topic(topic1).subscriptionName(subName2).subscribe();
+ org.apache.pulsar.client.api.Producer<byte[]> producer =
createProducer(client, topic1);
+ byte[] content = new byte[1024];
+ for (int i = 0; i < numMsgs; i++) {
+ producer.send(content);
+ consumer1.receive();
+ consumer2.receive();
+ }
+
+ TopicStats stats = getTopicStats(topic1);
+ assertEquals(stats.getSubscriptions().get(subName1).getMsgBacklog(),
5);
+ assertEquals(stats.getSubscriptions().get(subName2).getMsgBacklog(),
5);
+
+ // Sleep 5000 mills for first 5 messages.
+ Thread.sleep(5000L);
+ numMsgs = 9;
+ for (int i = 0; i < numMsgs; i++) {
+ producer.send(content);
+ consumer1.receive();
+ consumer2.receive();
+ }
+
+ // The first 5 messages are expired after sleeping 2000 more mills.
+ Thread.sleep(2000L);
+ rolloverStats();
+
+ TopicStats stats2 = getTopicStats(topic1);
+ // The first 5 messages should be expired due to limit time is 5
seconds, and the last 9 message should not.
+ Awaitility.await().untilAsserted(() -> {
+
assertEquals(stats2.getSubscriptions().get(subName1).getMsgBacklog(), 9);
+
assertEquals(stats2.getSubscriptions().get(subName2).getMsgBacklog(), 9);
+ });
+ client.close();
+ }
+
+
@Test
public void testConsumerBacklogEvictionTimeQuotaWithEmptyLedger() throws
Exception {
assertEquals(admin.namespaces().getBacklogQuotaMap("prop/ns-quota"),