This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 4a15256f37 [ISSUE #9300] Periodic cleanup of inactive items in
StatsItemSet (#9301)
4a15256f37 is described below
commit 4a15256f378a88253bb1e77b27fa02eb85d57ebd
Author: ymwneu <[email protected]>
AuthorDate: Tue Apr 1 13:43:56 2025 +0800
[ISSUE #9300] Periodic cleanup of inactive items in StatsItemSet (#9301)
---
.../org/apache/rocketmq/common/BrokerConfig.java | 10 +++++
.../rocketmq/common/stats/MomentStatsItem.java | 9 +++++
.../rocketmq/common/stats/MomentStatsItemSet.java | 23 ++++++++++++
.../apache/rocketmq/common/stats/StatsItem.java | 9 +++++
.../apache/rocketmq/common/stats/StatsItemSet.java | 19 ++++++++++
.../rocketmq/store/stats/BrokerStatsManager.java | 43 +++++++++++++++++++++-
6 files changed, 111 insertions(+), 2 deletions(-)
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index dd34544935..b7ec944505 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -130,6 +130,8 @@ public class BrokerConfig extends BrokerIdentity {
private boolean accountStatsEnable = true;
private boolean accountStatsPrintZeroValues = true;
+ private int maxStatsIdleTimeInMinutes = -1;
+
private boolean transferMsgByHeap = true;
private String regionId = MixAll.DEFAULT_TRACE_REGION_ID;
@@ -1535,6 +1537,14 @@ public class BrokerConfig extends BrokerIdentity {
this.accountStatsPrintZeroValues = accountStatsPrintZeroValues;
}
+ public int getMaxStatsIdleTimeInMinutes() {
+ return maxStatsIdleTimeInMinutes;
+ }
+
+ public void setMaxStatsIdleTimeInMinutes(int maxStatsIdleTimeInMinutes) {
+ this.maxStatsIdleTimeInMinutes = maxStatsIdleTimeInMinutes;
+ }
+
public boolean isLockInStrictMode() {
return lockInStrictMode;
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItem.java
b/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItem.java
index 71c796b283..559bb77953 100644
--- a/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItem.java
+++ b/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItem.java
@@ -31,6 +31,7 @@ public class MomentStatsItem {
private final String statsKey;
private final ScheduledExecutorService scheduledExecutorService;
private final Logger log;
+ private long lastUpdateTimestamp = System.currentTimeMillis();
public MomentStatsItem(String statsName, String statsKey,
ScheduledExecutorService scheduledExecutorService, Logger log) {
@@ -72,4 +73,12 @@ public class MomentStatsItem {
public String getStatsName() {
return statsName;
}
+
+ public long getLastUpdateTimestamp() {
+ return lastUpdateTimestamp;
+ }
+
+ public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
+ this.lastUpdateTimestamp = lastUpdateTimestamp;
+ }
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java
b/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java
index a4571d7b8a..fd65351a54 100644
---
a/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java
+++
b/common/src/main/java/org/apache/rocketmq/common/stats/MomentStatsItemSet.java
@@ -24,9 +24,12 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
public class MomentStatsItemSet {
+ private static final Logger COMMERCIAL_LOG =
LoggerFactory.getLogger(LoggerName.COMMERCIAL_LOGGER_NAME);
private final ConcurrentMap<String/* key */, MomentStatsItem>
statsItemTable =
new ConcurrentHashMap<>(128);
private final String statsName;
@@ -72,6 +75,13 @@ public class MomentStatsItemSet {
public void setValue(final String statsKey, final int value) {
MomentStatsItem statsItem = this.getAndCreateStatsItem(statsKey);
statsItem.getValue().set(value);
+ statsItem.setLastUpdateTimestamp(System.currentTimeMillis());
+ }
+
+ public void setValue(final String statsKey, final long value) {
+ MomentStatsItem statsItem = this.getAndCreateStatsItem(statsKey);
+ statsItem.getValue().set(value);
+ statsItem.setLastUpdateTimestamp(System.currentTimeMillis());
}
public void delValueByInfixKey(final String statsKey, String separator) {
@@ -109,4 +119,17 @@ public class MomentStatsItemSet {
return statsItem;
}
+
+ public void cleanResource(int maxStatsIdleTimeInMinutes) {
+ COMMERCIAL_LOG.info("CleanStatisticItem: kind:{}, size:{}", statsName,
this.statsItemTable.size());
+ Iterator<Entry<String, MomentStatsItem>> it =
this.statsItemTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, MomentStatsItem> next = it.next();
+ MomentStatsItem statsItem = next.getValue();
+ if (System.currentTimeMillis() -
statsItem.getLastUpdateTimestamp() > maxStatsIdleTimeInMinutes * 60 * 1000L) {
+ it.remove();
+ COMMERCIAL_LOG.info("CleanStatisticItem: removeKind:{},
removeKey:{}", statsName, statsItem.getStatsKey());
+ }
+ }
+ }
}
diff --git
a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java
b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java
index 8307c20aa6..cc5de16095 100644
--- a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java
+++ b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItem.java
@@ -38,6 +38,7 @@ public class StatsItem {
private final String statsName;
private final String statsKey;
+ private long lastUpdateTimestamp = System.currentTimeMillis();
private final ScheduledExecutorService scheduledExecutorService;
private final Logger logger;
@@ -229,6 +230,14 @@ public class StatsItem {
public LongAdder getTimes() {
return times;
}
+
+ public long getLastUpdateTimestamp() {
+ return lastUpdateTimestamp;
+ }
+
+ public void setLastUpdateTimestamp(long lastUpdateTimestamp) {
+ this.lastUpdateTimestamp = lastUpdateTimestamp;
+ }
}
class CallSnapshot {
diff --git
a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java
b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java
index c5b140b5cc..8ed1486e9f 100644
--- a/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java
+++ b/common/src/main/java/org/apache/rocketmq/common/stats/StatsItemSet.java
@@ -24,9 +24,12 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
public class StatsItemSet {
+ private static final Logger COMMERCIAL_LOG =
LoggerFactory.getLogger(LoggerName.COMMERCIAL_LOGGER_NAME);
private final ConcurrentMap<String/* key */, StatsItem> statsItemTable =
new ConcurrentHashMap<>(128);
@@ -157,12 +160,14 @@ public class StatsItemSet {
StatsItem statsItem = this.getAndCreateStatsItem(statsKey);
statsItem.getValue().add(incValue);
statsItem.getTimes().add(incTimes);
+ statsItem.setLastUpdateTimestamp(System.currentTimeMillis());
}
public void addRTValue(final String statsKey, final int incValue, final
int incTimes) {
StatsItem statsItem = this.getAndCreateRTStatsItem(statsKey);
statsItem.getValue().add(incValue);
statsItem.getTimes().add(incTimes);
+ statsItem.setLastUpdateTimestamp(System.currentTimeMillis());
}
public void delValue(final String statsKey) {
@@ -256,4 +261,18 @@ public class StatsItemSet {
public StatsItem getStatsItem(final String statsKey) {
return this.statsItemTable.get(statsKey);
}
+
+
+ public void cleanResource(int maxStatsIdleTimeInMinutes) {
+ COMMERCIAL_LOG.info("CleanStatisticItemOld: kind:{}, size:{}",
statsName, this.statsItemTable.size());
+ Iterator<Entry<String, StatsItem>> it =
this.statsItemTable.entrySet().iterator();
+ while (it.hasNext()) {
+ Entry<String, StatsItem> next = it.next();
+ StatsItem statsItem = next.getValue();
+ if (System.currentTimeMillis() -
statsItem.getLastUpdateTimestamp() > maxStatsIdleTimeInMinutes * 60 * 1000L) {
+ it.remove();
+ COMMERCIAL_LOG.info("CleanStatisticItemOld: removeKind:{},
removeKey:{}", statsName, statsItem.getStatsKey());
+ }
+ }
+ }
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
index a6c33f6131..530339c23b 100644
---
a/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
+++
b/store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java
@@ -18,6 +18,8 @@ package org.apache.rocketmq.store.stats;
import java.util.HashMap;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
import org.apache.commons.lang3.tuple.Pair;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.ThreadFactoryImpl;
@@ -121,6 +123,8 @@ public class BrokerStatsManager {
public static final String CHANNEL_ACTIVITY_IDLE = "IDLE";
public static final String CHANNEL_ACTIVITY_EXCEPTION = "EXCEPTION";
public static final String CHANNEL_ACTIVITY_CLOSE = "CLOSE";
+ private static final String[] NEED_CLEAN_STATS_SET =
+ new String[] {TOPIC_PUT_NUMS, TOPIC_PUT_SIZE, GROUP_GET_NUMS,
GROUP_GET_SIZE, SNDBCK_PUT_NUMS, GROUP_GET_LATENCY};
/**
* read disk follow stats
@@ -134,6 +138,7 @@ public class BrokerStatsManager {
private ScheduledExecutorService scheduledExecutorService;
private ScheduledExecutorService commercialExecutor;
private ScheduledExecutorService accountExecutor;
+ private ScheduledExecutorService cleanResourceExecutor;
private final HashMap<String, StatsItemSet> statsTable = new HashMap<>();
private final String clusterName;
@@ -277,6 +282,12 @@ public class BrokerStatsManager {
return false;
}
});
+ cleanResourceExecutor.scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ cleanAllResource();
+ }
+ }, 10, 10, TimeUnit.MINUTES);
}
private void initScheduleService() {
@@ -286,6 +297,8 @@ public class BrokerStatsManager {
ThreadUtils.newSingleThreadScheduledExecutor(new
ThreadFactoryImpl("CommercialStatsThread", true, brokerConfig));
this.accountExecutor =
ThreadUtils.newSingleThreadScheduledExecutor(new
ThreadFactoryImpl("AccountStatsThread", true, brokerConfig));
+ this.cleanResourceExecutor =
+ ThreadUtils.newSingleThreadScheduledExecutor(new
ThreadFactoryImpl("CleanStatsResourceThread", true, brokerConfig));
}
public MomentStatsItemSet getMomentStatsItemSetFallSize() {
@@ -318,6 +331,7 @@ public class BrokerStatsManager {
public void shutdown() {
this.scheduledExecutorService.shutdown();
this.commercialExecutor.shutdown();
+ this.cleanResourceExecutor.shutdown();
}
public StatsItem getStatsItem(final String statsName, final String
statsKey) {
@@ -589,13 +603,13 @@ public class BrokerStatsManager {
public void recordDiskFallBehindTime(final String group, final String
topic, final int queueId,
final long fallBehind) {
final String statsKey = buildStatsKey(queueId, topic, group);
-
this.momentStatsItemSetFallTime.getAndCreateStatsItem(statsKey).getValue().set(fallBehind);
+ this.momentStatsItemSetFallTime.setValue(statsKey, fallBehind);
}
public void recordDiskFallBehindSize(final String group, final String
topic, final int queueId,
final long fallBehind) {
final String statsKey = buildStatsKey(queueId, topic, group);
-
this.momentStatsItemSetFallSize.getAndCreateStatsItem(statsKey).getValue().set(fallBehind);
+ this.momentStatsItemSetFallSize.setValue(statsKey, fallBehind);
}
public void incDLQStatValue(final String key, final String owner, final
String group,
@@ -764,6 +778,31 @@ public class BrokerStatsManager {
boolean online(String instanceId, String group, String topic);
}
+
+ private void cleanAllResource() {
+ try {
+ int maxStatsIdleTimeInMinutes = brokerConfig != null ?
brokerConfig.getMaxStatsIdleTimeInMinutes() : -1;
+ if (maxStatsIdleTimeInMinutes < 0) {
+ COMMERCIAL_LOG.info("[BrokerStatsManager#cleanAllResource]
maxStatsIdleTimeInMinutes={}, no need to clean resource",
maxStatsIdleTimeInMinutes);
+ return;
+ }
+ if (maxStatsIdleTimeInMinutes <= 10 && maxStatsIdleTimeInMinutes
>= 0) {
+ maxStatsIdleTimeInMinutes = 30;
+ }
+ for (String statsKind : NEED_CLEAN_STATS_SET) {
+ StatsItemSet statsItemSet = this.statsTable.get(statsKind);
+ if (null == statsItemSet) {
+ continue;
+ }
+ statsItemSet.cleanResource(maxStatsIdleTimeInMinutes);
+ }
+
momentStatsItemSetFallSize.cleanResource(maxStatsIdleTimeInMinutes);
+
momentStatsItemSetFallTime.cleanResource(maxStatsIdleTimeInMinutes);
+ } catch (Throwable throwable) {
+ COMMERCIAL_LOG.error("[BrokerStatsManager#cleanAllResource] clean
resource error", throwable);
+ }
+ }
+
public enum StatsType {
SEND_SUCCESS,
SEND_FAILURE,