This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 32d68e7856 [ISSUE #9581] Optimize the resource bloat of pollingMap and
topicCidMap in LMQ scenarios (#9579)
32d68e7856 is described below
commit 32d68e78567903c07139cfdb519a7cd2a6222e2a
Author: rongtong <[email protected]>
AuthorDate: Mon Sep 8 13:49:48 2025 +0800
[ISSUE #9581] Optimize the resource bloat of pollingMap and topicCidMap in
LMQ scenarios (#9579)
* Optimize the resource bloat of pollingMap and topicCidMap in LMQ scenarios
* Fix PopLongPollingServiceTest some tests can not pass
---
.../broker/longpolling/PopLongPollingService.java | 75 +++++++++++-----------
.../broker/processor/PollingInfoProcessor.java | 2 +-
.../broker/processor/PopMessageProcessor.java | 4 +-
.../longpolling/PopLongPollingServiceTest.java | 22 +++++--
.../org/apache/rocketmq/common/BrokerConfig.java | 10 +++
5 files changed, 64 insertions(+), 49 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
index e87a8e803f..71520b8219 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
@@ -17,13 +17,15 @@
package org.apache.rocketmq.broker.longpolling;
-import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
import io.netty.channel.ChannelHandlerContext;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.KeyBuilder;
@@ -52,21 +54,27 @@ public class PopLongPollingService extends ServiceThread {
LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
private final BrokerController brokerController;
private final NettyRequestProcessor processor;
- private final ConcurrentLinkedHashMap<String, ConcurrentHashMap<String,
Byte>> topicCidMap;
- private final ConcurrentLinkedHashMap<String,
ConcurrentSkipListSet<PopRequest>> pollingMap;
+ private final Cache<String, ConcurrentHashMap<String, Byte>> topicCidMap;
+ private final Cache<String, ConcurrentSkipListSet<PopRequest>> pollingMap;
private long lastCleanTime = 0;
private final AtomicLong totalPollingNum = new AtomicLong(0);
private final boolean notifyLast;
- public PopLongPollingService(BrokerController brokerController,
NettyRequestProcessor processor, boolean notifyLast) {
+ public PopLongPollingService(BrokerController brokerController,
NettyRequestProcessor processor,
+ boolean notifyLast) {
this.brokerController = brokerController;
this.processor = processor;
// 100000 topic default, 100000 lru topic + cid + qid
- this.topicCidMap = new ConcurrentLinkedHashMap.Builder<String,
ConcurrentHashMap<String, Byte>>()
-
.maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize()
* 2L).build();
- this.pollingMap = new ConcurrentLinkedHashMap.Builder<String,
ConcurrentSkipListSet<PopRequest>>()
-
.maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize()).build();
+ this.topicCidMap = Caffeine.newBuilder()
+
.maximumSize(this.brokerController.getBrokerConfig().getPopPollingMapSize() *
2L)
+
.expireAfterAccess(this.brokerController.getBrokerConfig().getPopPollingMapExpireTimeSeconds(),
TimeUnit.SECONDS)
+ .build();
+
+ this.pollingMap = Caffeine.newBuilder()
+
.maximumSize(this.brokerController.getBrokerConfig().getPopPollingMapSize())
+
.expireAfterAccess(this.brokerController.getBrokerConfig().getPopPollingMapExpireTimeSeconds(),
TimeUnit.SECONDS)
+ .build();
this.notifyLast = notifyLast;
}
@@ -85,11 +93,11 @@ public class PopLongPollingService extends ServiceThread {
try {
this.waitForRunning(20);
i++;
- if (pollingMap.isEmpty()) {
+ if (pollingMap.estimatedSize() == 0) {
continue;
}
long tmpTotalPollingNum = 0;
- for (Map.Entry<String, ConcurrentSkipListSet<PopRequest>>
entry : pollingMap.entrySet()) {
+ for (Map.Entry<String, ConcurrentSkipListSet<PopRequest>>
entry : pollingMap.asMap().entrySet()) {
String key = entry.getKey();
ConcurrentSkipListSet<PopRequest> popQ = entry.getValue();
if (popQ == null) {
@@ -126,7 +134,7 @@ public class PopLongPollingService extends ServiceThread {
if (i >= 100) {
POP_LOGGER.info("pollingMapSize={},tmpTotalSize={},atomicTotalSize={},diffSize={}",
- pollingMap.size(), tmpTotalPollingNum,
totalPollingNum.get(),
+ pollingMap.estimatedSize(), tmpTotalPollingNum,
totalPollingNum.get(),
Math.abs(totalPollingNum.get() - tmpTotalPollingNum));
totalPollingNum.set(tmpTotalPollingNum);
i = 0;
@@ -142,7 +150,7 @@ public class PopLongPollingService extends ServiceThread {
}
// clean all;
try {
- for (Map.Entry<String, ConcurrentSkipListSet<PopRequest>> entry :
pollingMap.entrySet()) {
+ for (Map.Entry<String, ConcurrentSkipListSet<PopRequest>> entry :
pollingMap.asMap().entrySet()) {
ConcurrentSkipListSet<PopRequest> popQ = entry.getValue();
PopRequest first;
while ((first = popQ.pollFirst()) != null) {
@@ -170,7 +178,7 @@ public class PopLongPollingService extends ServiceThread {
public void notifyMessageArriving(final String topic, final int queueId,
long offset,
Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String,
String> properties) {
- ConcurrentHashMap<String, Byte> cids = topicCidMap.get(topic);
+ ConcurrentHashMap<String, Byte> cids = topicCidMap.getIfPresent(topic);
if (cids == null) {
return;
}
@@ -196,7 +204,7 @@ public class PopLongPollingService extends ServiceThread {
public boolean notifyMessageArriving(final String topic, final int
queueId, final String cid, boolean force,
Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String,
String> properties, CommandCallback callback) {
- ConcurrentSkipListSet<PopRequest> remotingCommands =
pollingMap.get(KeyBuilder.buildPollingKey(topic, cid, queueId));
+ ConcurrentSkipListSet<PopRequest> remotingCommands =
pollingMap.getIfPresent(KeyBuilder.buildPollingKey(topic, cid, queueId));
if (remotingCommands == null || remotingCommands.isEmpty()) {
return false;
}
@@ -286,14 +294,7 @@ public class PopLongPollingService extends ServiceThread {
if (requestHeader.getPollTime() <= 0 || this.isStopped()) {
return NOT_POLLING;
}
- ConcurrentHashMap<String, Byte> cids =
topicCidMap.get(requestHeader.getTopic());
- if (cids == null) {
- cids = new ConcurrentHashMap<>();
- ConcurrentHashMap<String, Byte> old =
topicCidMap.putIfAbsent(requestHeader.getTopic(), cids);
- if (old != null) {
- cids = old;
- }
- }
+ ConcurrentHashMap<String, Byte> cids =
topicCidMap.get(requestHeader.getTopic(), key -> new ConcurrentHashMap<>());
cids.putIfAbsent(requestHeader.getConsumerGroup(), Byte.MIN_VALUE);
long expired = requestHeader.getBornTime() +
requestHeader.getPollTime();
final PopRequest request = new PopRequest(remotingCommand, ctx,
expired, subscriptionData, messageFilter);
@@ -311,21 +312,13 @@ public class PopLongPollingService extends ServiceThread {
}
String key = KeyBuilder.buildPollingKey(requestHeader.getTopic(),
requestHeader.getConsumerGroup(),
requestHeader.getQueueId());
- ConcurrentSkipListSet<PopRequest> queue = pollingMap.get(key);
- if (queue == null) {
- queue = new ConcurrentSkipListSet<>(PopRequest.COMPARATOR);
- ConcurrentSkipListSet<PopRequest> old =
pollingMap.putIfAbsent(key, queue);
- if (old != null) {
- queue = old;
- }
- } else {
- // check size
- int size = queue.size();
- if (size > brokerController.getBrokerConfig().getPopPollingSize())
{
- POP_LOGGER.info("polling {}, result POLLING_FULL,
singleSize:{}", remotingCommand, size);
- return POLLING_FULL;
- }
+ ConcurrentSkipListSet<PopRequest> queue = pollingMap.get(key, k -> new
ConcurrentSkipListSet<>(PopRequest.COMPARATOR));
+ int size = queue.size();
+ if (size > brokerController.getBrokerConfig().getPopPollingSize()) {
+ POP_LOGGER.info("polling {}, result POLLING_FULL, singleSize:{}",
remotingCommand, size);
+ return POLLING_FULL;
}
+
if (queue.add(request)) {
remotingCommand.setSuspended(true);
totalPollingNum.incrementAndGet();
@@ -339,14 +332,18 @@ public class PopLongPollingService extends ServiceThread {
}
}
- public ConcurrentLinkedHashMap<String, ConcurrentSkipListSet<PopRequest>>
getPollingMap() {
+ public Cache<String, ConcurrentSkipListSet<PopRequest>> getPollingMap() {
return pollingMap;
}
+ public Cache<String, ConcurrentHashMap<String, Byte>> getTopicCidMap() {
+ return topicCidMap;
+ }
+
private void cleanUnusedResource() {
try {
{
- Iterator<Map.Entry<String, ConcurrentHashMap<String, Byte>>>
topicCidMapIter = topicCidMap.entrySet().iterator();
+ Iterator<Map.Entry<String, ConcurrentHashMap<String, Byte>>>
topicCidMapIter = topicCidMap.asMap().entrySet().iterator();
while (topicCidMapIter.hasNext()) {
Map.Entry<String, ConcurrentHashMap<String, Byte>> entry =
topicCidMapIter.next();
String topic = entry.getKey();
@@ -368,7 +365,7 @@ public class PopLongPollingService extends ServiceThread {
}
{
- Iterator<Map.Entry<String, ConcurrentSkipListSet<PopRequest>>>
pollingMapIter = pollingMap.entrySet().iterator();
+ Iterator<Map.Entry<String, ConcurrentSkipListSet<PopRequest>>>
pollingMapIter = pollingMap.asMap().entrySet().iterator();
while (pollingMapIter.hasNext()) {
Map.Entry<String, ConcurrentSkipListSet<PopRequest>> entry
= pollingMapIter.next();
if (entry.getKey() == null) {
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PollingInfoProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PollingInfoProcessor.java
index f7baac144e..c114f4d4c3 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PollingInfoProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PollingInfoProcessor.java
@@ -106,7 +106,7 @@ public class PollingInfoProcessor implements
NettyRequestProcessor {
return response;
}
String key = KeyBuilder.buildPollingKey(requestHeader.getTopic(),
requestHeader.getConsumerGroup(), requestHeader.getQueueId());
- ConcurrentSkipListSet<PopRequest> queue =
this.brokerController.getPopMessageProcessor().getPollingMap().get(key);
+ ConcurrentSkipListSet<PopRequest> queue =
this.brokerController.getPopMessageProcessor().getPollingMap().getIfPresent(key);
if (queue != null) {
responseHeader.setPollingNum(queue.size());
} else {
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 73f442bcd6..7d98705576 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -17,7 +17,7 @@
package org.apache.rocketmq.broker.processor;
import com.alibaba.fastjson.JSON;
-import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
+import com.github.benmanes.caffeine.cache.Cache;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
@@ -173,7 +173,7 @@ public class PopMessageProcessor implements
NettyRequestProcessor {
return false;
}
- public ConcurrentLinkedHashMap<String, ConcurrentSkipListSet<PopRequest>>
getPollingMap() {
+ public Cache<String, ConcurrentSkipListSet<PopRequest>> getPollingMap() {
return popLongPollingService.getPollingMap();
}
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java
index 003bf09842..3547687a6d 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java
@@ -16,9 +16,11 @@
*/
package org.apache.rocketmq.broker.longpolling;
-import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.BrokerConfig;
@@ -102,14 +104,18 @@ public class PopLongPollingServiceTest {
public void testNotifyMessageArrivingValidRequest() throws Exception {
String cid = "CID_1";
int queueId = 0;
- ConcurrentLinkedHashMap<String, ConcurrentHashMap<String, Byte>>
topicCidMap = new ConcurrentLinkedHashMap.Builder<String,
ConcurrentHashMap<String, Byte>>()
- .maximumWeightedCapacity(10).build();
+ Cache<String, ConcurrentHashMap<String, Byte>> topicCidMap =
Caffeine.newBuilder()
+ .maximumSize(10)
+ .expireAfterAccess(300, TimeUnit.SECONDS)
+ .build();
ConcurrentHashMap<String, Byte> cids = new ConcurrentHashMap<>();
cids.put(cid, (byte) 1);
topicCidMap.put(defaultTopic, cids);
popLongPollingService = new PopLongPollingService(brokerController,
processor, true);
- ConcurrentLinkedHashMap<String, ConcurrentSkipListSet<PopRequest>>
pollingMap =
- new ConcurrentLinkedHashMap.Builder<String,
ConcurrentSkipListSet<PopRequest>>().maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize()).build();
+ Cache<String, ConcurrentSkipListSet<PopRequest>> pollingMap =
Caffeine.newBuilder()
+ .maximumSize(10)
+ .expireAfterAccess(300, TimeUnit.SECONDS)
+ .build();
Channel channel = mock(Channel.class);
when(channel.isActive()).thenReturn(true);
PopRequest popRequest = mock(PopRequest.class);
@@ -195,8 +201,10 @@ public class PopLongPollingServiceTest {
when(requestHeader.getPollTime()).thenReturn(1000L);
when(requestHeader.getTopic()).thenReturn(defaultTopic);
when(requestHeader.getConsumerGroup()).thenReturn("defaultGroup");
- ConcurrentLinkedHashMap<String, ConcurrentHashMap<String, Byte>>
topicCidMap = new ConcurrentLinkedHashMap.Builder<String,
ConcurrentHashMap<String, Byte>>()
- .maximumWeightedCapacity(10).build();
+ Cache<String, ConcurrentHashMap<String, Byte>> topicCidMap =
Caffeine.newBuilder()
+ .maximumSize(10)
+ .expireAfterAccess(300, TimeUnit.SECONDS)
+ .build();
ConcurrentHashMap<String, Byte> cids = new ConcurrentHashMap<>();
cids.put(cid, (byte) 1);
topicCidMap.put(defaultTopic, cids);
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index cee64f623b..04828da64d 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -214,6 +214,8 @@ public class BrokerConfig extends BrokerIdentity {
private int popPollingSize = 1024;
private int popPollingMapSize = 100000;
+
+ private int popPollingMapExpireTimeSeconds = 60 * 10;
// 20w cost 200M heap memory.
private long maxPopPollingSize = 100000;
private int reviveQueueNum = 8;
@@ -533,6 +535,14 @@ public class BrokerConfig extends BrokerIdentity {
this.popPollingMapSize = popPollingMapSize;
}
+ public int getPopPollingMapExpireTimeSeconds() {
+ return popPollingMapExpireTimeSeconds;
+ }
+
+ public void setPopPollingMapExpireTimeSeconds(int
popPollingMapExpireTimeSeconds) {
+ this.popPollingMapExpireTimeSeconds = popPollingMapExpireTimeSeconds;
+ }
+
public long getReviveScanTime() {
return reviveScanTime;
}