This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 3a2d674e0b31d5488a5c5183ca54c6701543741b
Author: zhenJiangWang <[email protected]>
AuthorDate: Fri May 23 17:38:34 2025 +0800

    [fix][broker] Resolve the issue of frequent updates in message expiration 
deletion rate (#24190)
    
    Co-authored-by: zjxxzjwang <[email protected]>
    (cherry picked from commit 81c94c8a912cf08ac3f561a2aab3fa910d781db8)
---
 .../broker/service/persistent/PersistentMessageExpiryMonitor.java     | 3 +--
 .../apache/pulsar/broker/service/persistent/PersistentReplicator.java | 1 +
 .../org/apache/pulsar/broker/service/persistent/PersistentTopic.java  | 1 +
 .../java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java    | 4 ++++
 4 files changed, 7 insertions(+), 2 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index f8d76e2dedd..7a18f4abe47 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -173,12 +173,11 @@ public class PersistentMessageExpiryMonitor implements 
FindEntryCallback, Messag
     }
 
 
-    private void updateRates() {
+    public void updateRates() {
         msgExpired.calculateRate();
     }
 
     public double getMessageExpiryRate() {
-        updateRates();
         return msgExpired.getRate();
     }
 
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 9f4cb8a1e5f..0afc4cd3ce8 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -594,6 +594,7 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
     public void updateRates() {
         msgOut.calculateRate();
         msgExpired.calculateRate();
+        expiryMonitor.updateRates();
 
         stats.msgRateOut = msgOut.getRate();
         stats.msgThroughputOut = msgOut.getValueRate();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index d8c2eadd039..8685f92138f 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2572,6 +2572,7 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                 // Populate subscription specific stats here
                 topicStatsStream.writePair("msgBacklog",
                         subscription.getNumberOfEntriesInBacklog(true));
+                subscription.getExpiryMonitor().updateRates();
                 topicStatsStream.writePair("msgRateExpired", 
subscription.getExpiredMessageRate());
                 topicStatsStream.writePair("msgRateOut", subMsgRateOut);
                 topicStatsStream.writePair("messageAckRate", subMsgAckRate);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index cf590a2bb3f..2fd9cadd192 100644
--- 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -806,6 +806,10 @@ public class PrometheusMetricsTest extends BrokerTestBase {
                 
pulsar.getBrokerService().getTopicIfExists(topic1).get().get().getSubscription(subName);
         PersistentSubscription sub2 = (PersistentSubscription)
                 
pulsar.getBrokerService().getTopicIfExists(topic2).get().get().getSubscription(subName);
+        Awaitility.await().until(() -> 
sub.getExpiryMonitor().getTotalMessageExpired() != 0);
+        Awaitility.await().until(() -> 
sub2.getExpiryMonitor().getTotalMessageExpired() != 0);
+        sub.getExpiryMonitor().updateRates();
+        sub2.getExpiryMonitor().updateRates();
         Awaitility.await().until(() -> sub.getExpiredMessageRate() != 0.0);
         Awaitility.await().until(() -> sub2.getExpiredMessageRate() != 0.0);
 

Reply via email to