This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 715dd5a885 Adding the EnableLmqStats option allows monitoring of LMQ
statistics at runtime (#8973)
715dd5a885 is described below
commit 715dd5a885ae89ebc05aea33971029d7306c80ae
Author: rongtong <[email protected]>
AuthorDate: Fri Nov 22 12:57:38 2024 +0800
Adding the EnableLmqStats option allows monitoring of LMQ statistics at
runtime (#8973)
---
.../apache/rocketmq/broker/BrokerController.java | 2 +-
.../org/apache/rocketmq/common/BrokerConfig.java | 11 ++
.../store/stats/LmqBrokerStatsManager.java | 117 ++++++++++++---------
3 files changed, 81 insertions(+), 49 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
index 143922e456..b907489bbf 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.java
@@ -341,7 +341,7 @@ public class BrokerController {
this.messageStoreConfig = messageStoreConfig;
this.authConfig = authConfig;
this.setStoreHost(new
InetSocketAddress(this.getBrokerConfig().getBrokerIP1(), getListenPort()));
- this.brokerStatsManager = messageStoreConfig.isEnableLmq() ? new
LmqBrokerStatsManager(this.brokerConfig.getBrokerClusterName(),
this.brokerConfig.isEnableDetailStat()) : new
BrokerStatsManager(this.brokerConfig.getBrokerClusterName(),
this.brokerConfig.isEnableDetailStat());
+ this.brokerStatsManager = messageStoreConfig.isEnableLmq() ? new
LmqBrokerStatsManager(this.brokerConfig) : new
BrokerStatsManager(this.brokerConfig.getBrokerClusterName(),
this.brokerConfig.isEnableDetailStat());
this.broadcastOffsetManager = new BroadcastOffsetManager(this);
if
(ConfigManagerVersion.V2.getVersion().equals(brokerConfig.getConfigManagerVersion()))
{
this.configStorage = new
ConfigStorage(messageStoreConfig.getStorePathRootDir());
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index f459abf0db..9d8d913521 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -435,6 +435,9 @@ public class BrokerConfig extends BrokerIdentity {
private boolean appendCkAsync = false;
+
+ private boolean enableLmqStats = false;
+
/**
* V2 is recommended in cases where LMQ feature is extensively used.
*/
@@ -1905,6 +1908,14 @@ public class BrokerConfig extends BrokerIdentity {
this.appendCkAsync = appendCkAsync;
}
+ public boolean isEnableLmqStats() {
+ return enableLmqStats;
+ }
+
+ public void setEnableLmqStats(boolean enableLmqStats) {
+ this.enableLmqStats = enableLmqStats;
+ }
+
public String getConfigManagerVersion() {
return configManagerVersion;
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/stats/LmqBrokerStatsManager.java
b/store/src/main/java/org/apache/rocketmq/store/stats/LmqBrokerStatsManager.java
index b17fcbc9ca..20ed879331 100644
---
a/store/src/main/java/org/apache/rocketmq/store/stats/LmqBrokerStatsManager.java
+++
b/store/src/main/java/org/apache/rocketmq/store/stats/LmqBrokerStatsManager.java
@@ -16,23 +16,29 @@
*/
package org.apache.rocketmq.store.stats;
+import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
public class LmqBrokerStatsManager extends BrokerStatsManager {
- public LmqBrokerStatsManager(String clusterName, boolean enableQueueStat) {
- super(clusterName, enableQueueStat);
+ private final BrokerConfig brokerConfig;
+
+ public LmqBrokerStatsManager(BrokerConfig brokerConfig) {
+ super(brokerConfig.getBrokerClusterName(),
brokerConfig.isEnableDetailStat());
+ this.brokerConfig = brokerConfig;
}
@Override
public void incGroupGetNums(final String group, final String topic, final
int incValue) {
String lmqGroup = group;
String lmqTopic = topic;
- if (MixAll.isLmq(group)) {
- lmqGroup = MixAll.LMQ_PREFIX;
- }
- if (MixAll.isLmq(topic)) {
- lmqTopic = MixAll.LMQ_PREFIX;
+ if (!brokerConfig.isEnableLmqStats()) {
+ if (MixAll.isLmq(group)) {
+ lmqGroup = MixAll.LMQ_PREFIX;
+ }
+ if (MixAll.isLmq(topic)) {
+ lmqTopic = MixAll.LMQ_PREFIX;
+ }
}
super.incGroupGetNums(lmqGroup, lmqTopic, incValue);
}
@@ -41,25 +47,28 @@ public class LmqBrokerStatsManager extends
BrokerStatsManager {
public void incGroupGetSize(final String group, final String topic, final
int incValue) {
String lmqGroup = group;
String lmqTopic = topic;
- if (MixAll.isLmq(group)) {
- lmqGroup = MixAll.LMQ_PREFIX;
- }
- if (MixAll.isLmq(topic)) {
- lmqTopic = MixAll.LMQ_PREFIX;
+ if (!brokerConfig.isEnableLmqStats()) {
+ if (MixAll.isLmq(group)) {
+ lmqGroup = MixAll.LMQ_PREFIX;
+ }
+ if (MixAll.isLmq(topic)) {
+ lmqTopic = MixAll.LMQ_PREFIX;
+ }
}
super.incGroupGetSize(lmqGroup, lmqTopic, incValue);
}
-
@Override
public void incGroupAckNums(final String group, final String topic, final
int incValue) {
String lmqGroup = group;
String lmqTopic = topic;
- if (MixAll.isLmq(group)) {
- lmqGroup = MixAll.LMQ_PREFIX;
- }
- if (MixAll.isLmq(topic)) {
- lmqTopic = MixAll.LMQ_PREFIX;
+ if (!brokerConfig.isEnableLmqStats()) {
+ if (MixAll.isLmq(group)) {
+ lmqGroup = MixAll.LMQ_PREFIX;
+ }
+ if (MixAll.isLmq(topic)) {
+ lmqTopic = MixAll.LMQ_PREFIX;
+ }
}
super.incGroupAckNums(lmqGroup, lmqTopic, incValue);
}
@@ -68,11 +77,13 @@ public class LmqBrokerStatsManager extends
BrokerStatsManager {
public void incGroupCkNums(final String group, final String topic, final
int incValue) {
String lmqGroup = group;
String lmqTopic = topic;
- if (MixAll.isLmq(group)) {
- lmqGroup = MixAll.LMQ_PREFIX;
- }
- if (MixAll.isLmq(topic)) {
- lmqTopic = MixAll.LMQ_PREFIX;
+ if (!brokerConfig.isEnableLmqStats()) {
+ if (MixAll.isLmq(group)) {
+ lmqGroup = MixAll.LMQ_PREFIX;
+ }
+ if (MixAll.isLmq(topic)) {
+ lmqTopic = MixAll.LMQ_PREFIX;
+ }
}
super.incGroupCkNums(lmqGroup, lmqTopic, incValue);
}
@@ -81,11 +92,13 @@ public class LmqBrokerStatsManager extends
BrokerStatsManager {
public void incGroupGetLatency(final String group, final String topic,
final int queueId, final int incValue) {
String lmqGroup = group;
String lmqTopic = topic;
- if (MixAll.isLmq(group)) {
- lmqGroup = MixAll.LMQ_PREFIX;
- }
- if (MixAll.isLmq(topic)) {
- lmqTopic = MixAll.LMQ_PREFIX;
+ if (!brokerConfig.isEnableLmqStats()) {
+ if (MixAll.isLmq(group)) {
+ lmqGroup = MixAll.LMQ_PREFIX;
+ }
+ if (MixAll.isLmq(topic)) {
+ lmqTopic = MixAll.LMQ_PREFIX;
+ }
}
super.incGroupGetLatency(lmqGroup, lmqTopic, queueId, incValue);
}
@@ -94,11 +107,13 @@ public class LmqBrokerStatsManager extends
BrokerStatsManager {
public void incSendBackNums(final String group, final String topic) {
String lmqGroup = group;
String lmqTopic = topic;
- if (MixAll.isLmq(group)) {
- lmqGroup = MixAll.LMQ_PREFIX;
- }
- if (MixAll.isLmq(topic)) {
- lmqTopic = MixAll.LMQ_PREFIX;
+ if (!brokerConfig.isEnableLmqStats()) {
+ if (MixAll.isLmq(group)) {
+ lmqGroup = MixAll.LMQ_PREFIX;
+ }
+ if (MixAll.isLmq(topic)) {
+ lmqTopic = MixAll.LMQ_PREFIX;
+ }
}
super.incSendBackNums(lmqGroup, lmqTopic);
}
@@ -107,11 +122,13 @@ public class LmqBrokerStatsManager extends
BrokerStatsManager {
public double tpsGroupGetNums(final String group, final String topic) {
String lmqGroup = group;
String lmqTopic = topic;
- if (MixAll.isLmq(group)) {
- lmqGroup = MixAll.LMQ_PREFIX;
- }
- if (MixAll.isLmq(topic)) {
- lmqTopic = MixAll.LMQ_PREFIX;
+ if (!brokerConfig.isEnableLmqStats()) {
+ if (MixAll.isLmq(group)) {
+ lmqGroup = MixAll.LMQ_PREFIX;
+ }
+ if (MixAll.isLmq(topic)) {
+ lmqTopic = MixAll.LMQ_PREFIX;
+ }
}
return super.tpsGroupGetNums(lmqGroup, lmqTopic);
}
@@ -121,11 +138,13 @@ public class LmqBrokerStatsManager extends
BrokerStatsManager {
final long fallBehind) {
String lmqGroup = group;
String lmqTopic = topic;
- if (MixAll.isLmq(group)) {
- lmqGroup = MixAll.LMQ_PREFIX;
- }
- if (MixAll.isLmq(topic)) {
- lmqTopic = MixAll.LMQ_PREFIX;
+ if (!brokerConfig.isEnableLmqStats()) {
+ if (MixAll.isLmq(group)) {
+ lmqGroup = MixAll.LMQ_PREFIX;
+ }
+ if (MixAll.isLmq(topic)) {
+ lmqTopic = MixAll.LMQ_PREFIX;
+ }
}
super.recordDiskFallBehindTime(lmqGroup, lmqTopic, queueId,
fallBehind);
}
@@ -135,11 +154,13 @@ public class LmqBrokerStatsManager extends
BrokerStatsManager {
final long fallBehind) {
String lmqGroup = group;
String lmqTopic = topic;
- if (MixAll.isLmq(group)) {
- lmqGroup = MixAll.LMQ_PREFIX;
- }
- if (MixAll.isLmq(topic)) {
- lmqTopic = MixAll.LMQ_PREFIX;
+ if (!brokerConfig.isEnableLmqStats()) {
+ if (MixAll.isLmq(group)) {
+ lmqGroup = MixAll.LMQ_PREFIX;
+ }
+ if (MixAll.isLmq(topic)) {
+ lmqTopic = MixAll.LMQ_PREFIX;
+ }
}
super.recordDiskFallBehindSize(lmqGroup, lmqTopic, queueId,
fallBehind);
}