This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new b936f46d2dd [improve][broker] Optimize message expiration rate repeated update issues (#24073) b936f46d2dd is described below commit b936f46d2dd2ade98521b7825ee9277549703ef1 Author: zhenJiangWang <zhenjiang...@gmail.com> AuthorDate: Thu Mar 20 02:15:18 2025 +0800 [improve][broker] Optimize message expiration rate repeated update issues (#24073) Co-authored-by: zjxxzjwang <zjxxzjw...@tencent.com> --- .../broker/service/persistent/PersistentMessageExpiryMonitor.java | 4 ++-- .../apache/pulsar/broker/service/persistent/PersistentReplicator.java | 1 - .../org/apache/pulsar/broker/service/persistent/PersistentTopic.java | 1 - 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java index 3b4bc9d8bce..4cd696b0eb7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java @@ -170,11 +170,12 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback, Messag } - public void updateRates() { + private void updateRates() { msgExpired.calculateRate(); } public double getMessageExpiryRate() { + updateRates(); return msgExpired.getRate(); } @@ -190,7 +191,6 @@ public class PersistentMessageExpiryMonitor implements FindEntryCallback, Messag long numMessagesExpired = (long) ctx - cursor.getNumberOfEntriesInBacklog(false); msgExpired.recordMultipleEvents(numMessagesExpired, 0 /* no value stats */); totalMsgExpired.add(numMessagesExpired); - updateRates(); // If the subscription is a Key_Shared subscription, we should to trigger message dispatch. if (subscription != null && subscription.getType() == SubType.Key_Shared) { subscription.getDispatcher().markDeletePositionMoveForward(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java index 881b1b804e8..1b52d5dee67 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentReplicator.java @@ -639,7 +639,6 @@ public abstract class PersistentReplicator extends AbstractReplicator public void updateRates() { msgOut.calculateRate(); msgExpired.calculateRate(); - expiryMonitor.updateRates(); stats.msgRateOut = msgOut.getRate(); stats.msgThroughputOut = msgOut.getValueRate(); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 704cf2b393c..43908329fc0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2457,7 +2457,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal // Populate subscription specific stats here topicStatsStream.writePair("msgBacklog", subscription.getNumberOfEntriesInBacklog(true)); - subscription.getExpiryMonitor().updateRates(); topicStatsStream.writePair("msgRateExpired", subscription.getExpiredMessageRate()); topicStatsStream.writePair("msgRateOut", subMsgRateOut); topicStatsStream.writePair("messageAckRate", subMsgAckRate);