This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 93b83e8b1d8 [fix][broker] Fix issue where msgRateExpired may not 
refresh forever (#19759)
93b83e8b1d8 is described below

commit 93b83e8b1d803a2252548d69729c7f81bb1091e9
Author: Masahiro Sakamoto <[email protected]>
AuthorDate: Sun Mar 12 14:39:30 2023 +0900

    [fix][broker] Fix issue where msgRateExpired may not refresh forever 
(#19759)
---
 .../service/persistent/PersistentReplicator.java   |  2 ++
 .../broker/service/persistent/PersistentTopic.java |  1 +
 .../client/api/SimpleProducerConsumerStatTest.java | 41 ++++++++++++++++++++++
 3 files changed, 44 insertions(+)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 2e7dbb2fbf1..38a433b7629 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -683,6 +683,8 @@ public class PersistentReplicator extends AbstractReplicator
     public void updateRates() {
         msgOut.calculateRate();
         msgExpired.calculateRate();
+        expiryMonitor.updateRates();
+
         stats.msgRateOut = msgOut.getRate();
         stats.msgThroughputOut = msgOut.getValueRate();
         stats.msgRateExpired = msgExpired.getRate() + 
expiryMonitor.getMessageExpiryRate();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 4c7b7bb979c..6213db88f80 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1772,6 +1772,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                 // Populate subscription specific stats here
                 topicStatsStream.writePair("msgBacklog",
                         subscription.getNumberOfEntriesInBacklog(true));
+                subscription.getExpiryMonitor().updateRates();
                 topicStatsStream.writePair("msgRateExpired", 
subscription.getExpiredMessageRate());
                 topicStatsStream.writePair("msgRateOut", subMsgRateOut);
                 topicStatsStream.writePair("messageAckRate", subMsgAckRate);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
index 0adeca4bcdb..55d2d812186 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerStatTest.java
@@ -515,4 +515,45 @@ public class SimpleProducerConsumerStatTest extends 
ProducerConsumerBase {
 
         log.info("-- Exiting {} test --", methodName);
     }
+
+    @Test
+    public void testMsgRateExpired() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        String topicName = "persistent://my-property/tp1/my-ns/" + methodName;
+        String subName = "my-sub";
+        admin.topics().createSubscription(topicName, subName, 
MessageId.latest);
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(topicName)
+                .enableBatching(false)
+                .create();
+
+        int numMessages = 100;
+        for (int i = 0; i < numMessages; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        Thread.sleep(2000);
+        admin.topics().expireMessages(topicName, subName, 1);
+        pulsar.getBrokerService().updateRates();
+
+        Awaitility.await().ignoreExceptions().timeout(5, TimeUnit.SECONDS)
+                .until(() -> 
admin.topics().getStats(topicName).getSubscriptions().get(subName).getMsgRateExpired()
 > 0.001);
+
+        Thread.sleep(2000);
+        pulsar.getBrokerService().updateRates();
+
+        Awaitility.await().ignoreExceptions().timeout(5, TimeUnit.SECONDS)
+                .until(() -> 
admin.topics().getStats(topicName).getSubscriptions().get(subName).getMsgRateExpired()
 < 0.001);
+
+        
assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName).getMsgRateExpired(),
 0.0,
+                0.001);
+        
assertEquals(admin.topics().getStats(topicName).getSubscriptions().get(subName).getTotalMsgExpired(),
+                numMessages);
+
+        log.info("-- Exiting {} test --", methodName);
+    }
 }

Reply via email to