This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b936f46d2dd [improve][broker] Optimize message expiration rate 
repeated update issues (#24073)
b936f46d2dd is described below

commit b936f46d2dd2ade98521b7825ee9277549703ef1
Author: zhenJiangWang <zhenjiang...@gmail.com>
AuthorDate: Thu Mar 20 02:15:18 2025 +0800

    [improve][broker] Optimize message expiration rate repeated update issues 
(#24073)
    
    Co-authored-by: zjxxzjwang <zjxxzjw...@tencent.com>
---
 .../broker/service/persistent/PersistentMessageExpiryMonitor.java     | 4 ++--
 .../apache/pulsar/broker/service/persistent/PersistentReplicator.java | 1 -
 .../org/apache/pulsar/broker/service/persistent/PersistentTopic.java  | 1 -
 3 files changed, 2 insertions(+), 4 deletions(-)

diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
index 3b4bc9d8bce..4cd696b0eb7 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java
@@ -170,11 +170,12 @@ public class PersistentMessageExpiryMonitor implements 
FindEntryCallback, Messag
     }
 
 
-    public void updateRates() {
+    private void updateRates() {
         msgExpired.calculateRate();
     }
 
     public double getMessageExpiryRate() {
+        updateRates();
         return msgExpired.getRate();
     }
 
@@ -190,7 +191,6 @@ public class PersistentMessageExpiryMonitor implements 
FindEntryCallback, Messag
             long numMessagesExpired = (long) ctx - 
cursor.getNumberOfEntriesInBacklog(false);
             msgExpired.recordMultipleEvents(numMessagesExpired, 0 /* no value 
stats */);
             totalMsgExpired.add(numMessagesExpired);
-            updateRates();
             // If the subscription is a Key_Shared subscription, we should to 
trigger message dispatch.
             if (subscription != null && subscription.getType() == 
SubType.Key_Shared) {
                 subscription.getDispatcher().markDeletePositionMoveForward();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
index 881b1b804e8..1b52d5dee67 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java
@@ -639,7 +639,6 @@ public abstract class PersistentReplicator extends 
AbstractReplicator
     public void updateRates() {
         msgOut.calculateRate();
         msgExpired.calculateRate();
-        expiryMonitor.updateRates();
 
         stats.msgRateOut = msgOut.getRate();
         stats.msgThroughputOut = msgOut.getValueRate();
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 704cf2b393c..43908329fc0 100644
--- 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2457,7 +2457,6 @@ public class PersistentTopic extends AbstractTopic 
implements Topic, AddEntryCal
                 // Populate subscription specific stats here
                 topicStatsStream.writePair("msgBacklog",
                         subscription.getNumberOfEntriesInBacklog(true));
-                subscription.getExpiryMonitor().updateRates();
                 topicStatsStream.writePair("msgRateExpired", 
subscription.getExpiredMessageRate());
                 topicStatsStream.writePair("msgRateOut", subMsgRateOut);
                 topicStatsStream.writePair("messageAckRate", subMsgAckRate);

Reply via email to