This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 3696be0632 [ISSUE #8463] Some statistical items should also be deleted
to prevent memory leakage when a topic or group is deleted (#8464)
3696be0632 is described below
commit 3696be06321c24b4c534a5a6299fd587710b5de4
Author: rongtong <[email protected]>
AuthorDate: Thu Aug 1 10:04:11 2024 +0800
[ISSUE #8463] Some statistical items should also be deleted to prevent
memory leakage when a topic or group is deleted (#8464)
* Some important statistical items should also be deleted to prevent memory
leakage when a topic or group is deleted
* Add UTs
---
.../org/apache/rocketmq/common/stats/Stats.java | 3 +++
.../rocketmq/store/stats/BrokerStatsManager.java | 24 +++++++++++++---------
.../store/stats/BrokerStatsManagerTest.java | 13 ++++++++++++
3 files changed, 30 insertions(+), 10 deletions(-)
diff --git a/common/src/main/java/org/apache/rocketmq/common/stats/Stats.java
b/common/src/main/java/org/apache/rocketmq/common/stats/Stats.java
index b70f96e412..f67ccf9ae9 100644
--- a/common/src/main/java/org/apache/rocketmq/common/stats/Stats.java
+++ b/common/src/main/java/org/apache/rocketmq/common/stats/Stats.java
@@ -44,4 +44,7 @@ public class Stats {
public static final String GROUP_GET_FALL_SIZE = "GROUP_GET_FALL_SIZE";
public static final String GROUP_GET_FALL_TIME = "GROUP_GET_FALL_TIME";
public static final String GROUP_GET_LATENCY = "GROUP_GET_LATENCY";
+ public static final String TOPIC_PUT_LATENCY = "TOPIC_PUT_LATENCY";
+ public static final String GROUP_ACK_NUMS = "GROUP_ACK_NUMS";
+ public static final String GROUP_CK_NUMS = "GROUP_CK_NUMS";
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
index c165d333fd..a6c33f6131 100644
---
a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
+++
b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
@@ -69,9 +69,9 @@ public class BrokerStatsManager {
@Deprecated public static final String COMMERCIAL_PERM_FAILURES =
Stats.COMMERCIAL_PERM_FAILURES;
// Send message latency
- public static final String TOPIC_PUT_LATENCY = "TOPIC_PUT_LATENCY";
- public static final String GROUP_ACK_NUMS = "GROUP_ACK_NUMS";
- public static final String GROUP_CK_NUMS = "GROUP_CK_NUMS";
+ @Deprecated public static final String TOPIC_PUT_LATENCY =
"TOPIC_PUT_LATENCY";
+ @Deprecated public static final String GROUP_ACK_NUMS = "GROUP_ACK_NUMS";
+ @Deprecated public static final String GROUP_CK_NUMS = "GROUP_CK_NUMS";
public static final String DLQ_PUT_NUMS = "DLQ_PUT_NUMS";
public static final String BROKER_ACK_NUMS = "BROKER_ACK_NUMS";
public static final String BROKER_CK_NUMS = "BROKER_CK_NUMS";
@@ -179,10 +179,10 @@ public class BrokerStatsManager {
this.statsTable.put(Stats.TOPIC_PUT_SIZE, new
StatsItemSet(Stats.TOPIC_PUT_SIZE, this.scheduledExecutorService, log));
this.statsTable.put(Stats.GROUP_GET_NUMS, new
StatsItemSet(Stats.GROUP_GET_NUMS, this.scheduledExecutorService, log));
this.statsTable.put(Stats.GROUP_GET_SIZE, new
StatsItemSet(Stats.GROUP_GET_SIZE, this.scheduledExecutorService, log));
- this.statsTable.put(GROUP_ACK_NUMS, new StatsItemSet(GROUP_ACK_NUMS,
this.scheduledExecutorService, log));
- this.statsTable.put(GROUP_CK_NUMS, new StatsItemSet(GROUP_CK_NUMS,
this.scheduledExecutorService, log));
+ this.statsTable.put(Stats.GROUP_ACK_NUMS, new
StatsItemSet(Stats.GROUP_ACK_NUMS, this.scheduledExecutorService, log));
+ this.statsTable.put(Stats.GROUP_CK_NUMS, new
StatsItemSet(Stats.GROUP_CK_NUMS, this.scheduledExecutorService, log));
this.statsTable.put(Stats.GROUP_GET_LATENCY, new
StatsItemSet(Stats.GROUP_GET_LATENCY, this.scheduledExecutorService, log));
- this.statsTable.put(TOPIC_PUT_LATENCY, new
StatsItemSet(TOPIC_PUT_LATENCY, this.scheduledExecutorService, log));
+ this.statsTable.put(Stats.TOPIC_PUT_LATENCY, new
StatsItemSet(Stats.TOPIC_PUT_LATENCY, this.scheduledExecutorService, log));
this.statsTable.put(Stats.SNDBCK_PUT_NUMS, new
StatsItemSet(Stats.SNDBCK_PUT_NUMS, this.scheduledExecutorService, log));
this.statsTable.put(DLQ_PUT_NUMS, new StatsItemSet(DLQ_PUT_NUMS,
this.scheduledExecutorService, log));
this.statsTable.put(Stats.BROKER_PUT_NUMS, new
StatsItemSet(Stats.BROKER_PUT_NUMS, this.scheduledExecutorService, log));
@@ -338,10 +338,13 @@ public class BrokerStatsManager {
}
this.statsTable.get(Stats.GROUP_GET_NUMS).delValueByPrefixKey(topic,
"@");
this.statsTable.get(Stats.GROUP_GET_SIZE).delValueByPrefixKey(topic,
"@");
+ this.statsTable.get(Stats.GROUP_CK_NUMS).delValueByPrefixKey(topic,
"@");
+ this.statsTable.get(Stats.GROUP_ACK_NUMS).delValueByPrefixKey(topic,
"@");
this.statsTable.get(Stats.QUEUE_GET_NUMS).delValueByPrefixKey(topic,
"@");
this.statsTable.get(Stats.QUEUE_GET_SIZE).delValueByPrefixKey(topic,
"@");
this.statsTable.get(Stats.SNDBCK_PUT_NUMS).delValueByPrefixKey(topic,
"@");
this.statsTable.get(Stats.GROUP_GET_LATENCY).delValueByInfixKey(topic,
"@");
+
this.statsTable.get(Stats.TOPIC_PUT_LATENCY).delValueBySuffixKey(topic, "@");
this.momentStatsItemSetFallSize.delValueByInfixKey(topic, "@");
this.momentStatsItemSetFallTime.delValueByInfixKey(topic, "@");
}
@@ -349,6 +352,8 @@ public class BrokerStatsManager {
public void onGroupDeleted(final String group) {
this.statsTable.get(Stats.GROUP_GET_NUMS).delValueBySuffixKey(group,
"@");
this.statsTable.get(Stats.GROUP_GET_SIZE).delValueBySuffixKey(group,
"@");
+ this.statsTable.get(Stats.GROUP_CK_NUMS).delValueBySuffixKey(group,
"@");
+ this.statsTable.get(Stats.GROUP_ACK_NUMS).delValueBySuffixKey(group,
"@");
if (enableQueueStat) {
this.statsTable.get(Stats.QUEUE_GET_NUMS).delValueBySuffixKey(group, "@");
this.statsTable.get(Stats.QUEUE_GET_SIZE).delValueBySuffixKey(group, "@");
@@ -434,12 +439,12 @@ public class BrokerStatsManager {
public void incGroupCkNums(final String group, final String topic, final
int incValue) {
final String statsKey = buildStatsKey(topic, group);
- this.statsTable.get(GROUP_CK_NUMS).addValue(statsKey, incValue, 1);
+ this.statsTable.get(Stats.GROUP_CK_NUMS).addValue(statsKey, incValue,
1);
}
public void incGroupAckNums(final String group, final String topic, final
int incValue) {
final String statsKey = buildStatsKey(topic, group);
- this.statsTable.get(GROUP_ACK_NUMS).addValue(statsKey, incValue, 1);
+ this.statsTable.get(Stats.GROUP_ACK_NUMS).addValue(statsKey, incValue,
1);
}
public String buildStatsKey(String topic, String group) {
@@ -509,9 +514,8 @@ public class BrokerStatsManager {
statsKey = new StringBuilder(6);
}
statsKey.append(queueId).append("@").append(topic);
- this.statsTable.get(TOPIC_PUT_LATENCY).addValue(statsKey.toString(),
incValue, 1);
+
this.statsTable.get(Stats.TOPIC_PUT_LATENCY).addValue(statsKey.toString(),
incValue, 1);
}
-
public void incBrokerPutNums() {
this.statsTable.get(Stats.BROKER_PUT_NUMS).getAndCreateStatsItem(this.clusterName).getValue().add(1);
}
diff --git
a/store/src/test/java/org/apache/rocketmq/store/stats/BrokerStatsManagerTest.java
b/store/src/test/java/org/apache/rocketmq/store/stats/BrokerStatsManagerTest.java
index a602da0939..058ad0b020 100644
---
a/store/src/test/java/org/apache/rocketmq/store/stats/BrokerStatsManagerTest.java
+++
b/store/src/test/java/org/apache/rocketmq/store/stats/BrokerStatsManagerTest.java
@@ -24,6 +24,8 @@ import org.junit.Before;
import org.junit.Test;
import static org.apache.rocketmq.common.stats.Stats.BROKER_PUT_NUMS;
+import static org.apache.rocketmq.common.stats.Stats.GROUP_ACK_NUMS;
+import static org.apache.rocketmq.common.stats.Stats.GROUP_CK_NUMS;
import static org.apache.rocketmq.common.stats.Stats.GROUP_GET_FALL_SIZE;
import static org.apache.rocketmq.common.stats.Stats.GROUP_GET_FALL_TIME;
import static org.apache.rocketmq.common.stats.Stats.GROUP_GET_LATENCY;
@@ -34,6 +36,7 @@ import static
org.apache.rocketmq.common.stats.Stats.QUEUE_GET_SIZE;
import static org.apache.rocketmq.common.stats.Stats.QUEUE_PUT_NUMS;
import static org.apache.rocketmq.common.stats.Stats.QUEUE_PUT_SIZE;
import static org.apache.rocketmq.common.stats.Stats.SNDBCK_PUT_NUMS;
+import static org.apache.rocketmq.common.stats.Stats.TOPIC_PUT_LATENCY;
import static org.apache.rocketmq.common.stats.Stats.TOPIC_PUT_NUMS;
import static org.apache.rocketmq.common.stats.Stats.TOPIC_PUT_SIZE;
import static org.assertj.core.api.Assertions.assertThat;
@@ -139,8 +142,11 @@ public class BrokerStatsManagerTest {
brokerStatsManager.incTopicPutSize(TOPIC, 100);
brokerStatsManager.incQueuePutNums(TOPIC, QUEUE_ID);
brokerStatsManager.incQueuePutSize(TOPIC, QUEUE_ID, 100);
+ brokerStatsManager.incTopicPutLatency(TOPIC, QUEUE_ID, 10);
brokerStatsManager.incGroupGetNums(GROUP_NAME, TOPIC, 1);
brokerStatsManager.incGroupGetSize(GROUP_NAME, TOPIC, 100);
+ brokerStatsManager.incGroupCkNums(GROUP_NAME, TOPIC, 1);
+ brokerStatsManager.incGroupAckNums(GROUP_NAME, TOPIC, 1);
brokerStatsManager.incQueueGetNums(GROUP_NAME, TOPIC, QUEUE_ID, 1);
brokerStatsManager.incQueueGetSize(GROUP_NAME, TOPIC, QUEUE_ID, 100);
brokerStatsManager.incSendBackNums(GROUP_NAME, TOPIC);
@@ -162,6 +168,9 @@ public class BrokerStatsManagerTest {
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_LATENCY,
"1@" + TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_FALL_SIZE,
"1@" + TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_FALL_TIME,
"1@" + TOPIC + "@" + GROUP_NAME));
+ Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_CK_NUMS, TOPIC
+ "@" + GROUP_NAME));
+ Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_ACK_NUMS,
TOPIC + "@" + GROUP_NAME));
+ Assert.assertNull(brokerStatsManager.getStatsItem(TOPIC_PUT_LATENCY,
QUEUE_ID + "@" + TOPIC));
}
@Test
@@ -174,6 +183,8 @@ public class BrokerStatsManagerTest {
brokerStatsManager.incGroupGetLatency(GROUP_NAME, TOPIC, 1, 1);
brokerStatsManager.recordDiskFallBehindTime(GROUP_NAME, TOPIC, 1, 11L);
brokerStatsManager.recordDiskFallBehindSize(GROUP_NAME, TOPIC, 1, 11L);
+ brokerStatsManager.incGroupCkNums(GROUP_NAME, TOPIC, 1);
+ brokerStatsManager.incGroupAckNums(GROUP_NAME, TOPIC, 1);
brokerStatsManager.onGroupDeleted(GROUP_NAME);
@@ -185,6 +196,8 @@ public class BrokerStatsManagerTest {
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_LATENCY,
"1@" + TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_FALL_SIZE,
"1@" + TOPIC + "@" + GROUP_NAME));
Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_GET_FALL_TIME,
"1@" + TOPIC + "@" + GROUP_NAME));
+ Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_CK_NUMS, TOPIC
+ "@" + GROUP_NAME));
+ Assert.assertNull(brokerStatsManager.getStatsItem(GROUP_ACK_NUMS,
TOPIC + "@" + GROUP_NAME));
}
@Test