This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 32d68e7856 [ISSUE #9581] Optimize the resource bloat of pollingMap and 
topicCidMap in LMQ scenarios (#9579)
32d68e7856 is described below

commit 32d68e78567903c07139cfdb519a7cd2a6222e2a
Author: rongtong <[email protected]>
AuthorDate: Mon Sep 8 13:49:48 2025 +0800

    [ISSUE #9581] Optimize the resource bloat of pollingMap and topicCidMap in 
LMQ scenarios (#9579)
    
    * Optimize the resource bloat of pollingMap and topicCidMap in LMQ scenarios
    
    * Fix PopLongPollingServiceTest some tests can not pass
---
 .../broker/longpolling/PopLongPollingService.java  | 75 +++++++++++-----------
 .../broker/processor/PollingInfoProcessor.java     |  2 +-
 .../broker/processor/PopMessageProcessor.java      |  4 +-
 .../longpolling/PopLongPollingServiceTest.java     | 22 +++++--
 .../org/apache/rocketmq/common/BrokerConfig.java   | 10 +++
 5 files changed, 64 insertions(+), 49 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
index e87a8e803f..71520b8219 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
@@ -17,13 +17,15 @@
 
 package org.apache.rocketmq.broker.longpolling;
 
-import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
 import io.netty.channel.ChannelHandlerContext;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.KeyBuilder;
@@ -52,21 +54,27 @@ public class PopLongPollingService extends ServiceThread {
         LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
     private final BrokerController brokerController;
     private final NettyRequestProcessor processor;
-    private final ConcurrentLinkedHashMap<String, ConcurrentHashMap<String, 
Byte>> topicCidMap;
-    private final ConcurrentLinkedHashMap<String, 
ConcurrentSkipListSet<PopRequest>> pollingMap;
+    private final Cache<String, ConcurrentHashMap<String, Byte>> topicCidMap;
+    private final Cache<String, ConcurrentSkipListSet<PopRequest>> pollingMap;
     private long lastCleanTime = 0;
 
     private final AtomicLong totalPollingNum = new AtomicLong(0);
     private final boolean notifyLast;
 
-    public PopLongPollingService(BrokerController brokerController, 
NettyRequestProcessor processor, boolean notifyLast) {
+    public PopLongPollingService(BrokerController brokerController, 
NettyRequestProcessor processor,
+        boolean notifyLast) {
         this.brokerController = brokerController;
         this.processor = processor;
         // 100000 topic default,  100000 lru topic + cid + qid
-        this.topicCidMap = new ConcurrentLinkedHashMap.Builder<String, 
ConcurrentHashMap<String, Byte>>()
-            
.maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize()
 * 2L).build();
-        this.pollingMap = new ConcurrentLinkedHashMap.Builder<String, 
ConcurrentSkipListSet<PopRequest>>()
-            
.maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize()).build();
+        this.topicCidMap = Caffeine.newBuilder()
+            
.maximumSize(this.brokerController.getBrokerConfig().getPopPollingMapSize() * 
2L)
+            
.expireAfterAccess(this.brokerController.getBrokerConfig().getPopPollingMapExpireTimeSeconds(),
 TimeUnit.SECONDS)
+            .build();
+
+        this.pollingMap = Caffeine.newBuilder()
+            
.maximumSize(this.brokerController.getBrokerConfig().getPopPollingMapSize())
+            
.expireAfterAccess(this.brokerController.getBrokerConfig().getPopPollingMapExpireTimeSeconds(),
 TimeUnit.SECONDS)
+            .build();
         this.notifyLast = notifyLast;
     }
 
@@ -85,11 +93,11 @@ public class PopLongPollingService extends ServiceThread {
             try {
                 this.waitForRunning(20);
                 i++;
-                if (pollingMap.isEmpty()) {
+                if (pollingMap.estimatedSize() == 0) {
                     continue;
                 }
                 long tmpTotalPollingNum = 0;
-                for (Map.Entry<String, ConcurrentSkipListSet<PopRequest>> 
entry : pollingMap.entrySet()) {
+                for (Map.Entry<String, ConcurrentSkipListSet<PopRequest>> 
entry : pollingMap.asMap().entrySet()) {
                     String key = entry.getKey();
                     ConcurrentSkipListSet<PopRequest> popQ = entry.getValue();
                     if (popQ == null) {
@@ -126,7 +134,7 @@ public class PopLongPollingService extends ServiceThread {
 
                 if (i >= 100) {
                     
POP_LOGGER.info("pollingMapSize={},tmpTotalSize={},atomicTotalSize={},diffSize={}",
-                        pollingMap.size(), tmpTotalPollingNum, 
totalPollingNum.get(),
+                        pollingMap.estimatedSize(), tmpTotalPollingNum, 
totalPollingNum.get(),
                         Math.abs(totalPollingNum.get() - tmpTotalPollingNum));
                     totalPollingNum.set(tmpTotalPollingNum);
                     i = 0;
@@ -142,7 +150,7 @@ public class PopLongPollingService extends ServiceThread {
         }
         // clean all;
         try {
-            for (Map.Entry<String, ConcurrentSkipListSet<PopRequest>> entry : 
pollingMap.entrySet()) {
+            for (Map.Entry<String, ConcurrentSkipListSet<PopRequest>> entry : 
pollingMap.asMap().entrySet()) {
                 ConcurrentSkipListSet<PopRequest> popQ = entry.getValue();
                 PopRequest first;
                 while ((first = popQ.pollFirst()) != null) {
@@ -170,7 +178,7 @@ public class PopLongPollingService extends ServiceThread {
 
     public void notifyMessageArriving(final String topic, final int queueId, 
long offset,
         Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, 
String> properties) {
-        ConcurrentHashMap<String, Byte> cids = topicCidMap.get(topic);
+        ConcurrentHashMap<String, Byte> cids = topicCidMap.getIfPresent(topic);
         if (cids == null) {
             return;
         }
@@ -196,7 +204,7 @@ public class PopLongPollingService extends ServiceThread {
 
     public boolean notifyMessageArriving(final String topic, final int 
queueId, final String cid, boolean force,
         Long tagsCode, long msgStoreTime, byte[] filterBitMap, Map<String, 
String> properties, CommandCallback callback) {
-        ConcurrentSkipListSet<PopRequest> remotingCommands = 
pollingMap.get(KeyBuilder.buildPollingKey(topic, cid, queueId));
+        ConcurrentSkipListSet<PopRequest> remotingCommands = 
pollingMap.getIfPresent(KeyBuilder.buildPollingKey(topic, cid, queueId));
         if (remotingCommands == null || remotingCommands.isEmpty()) {
             return false;
         }
@@ -286,14 +294,7 @@ public class PopLongPollingService extends ServiceThread {
         if (requestHeader.getPollTime() <= 0 || this.isStopped()) {
             return NOT_POLLING;
         }
-        ConcurrentHashMap<String, Byte> cids = 
topicCidMap.get(requestHeader.getTopic());
-        if (cids == null) {
-            cids = new ConcurrentHashMap<>();
-            ConcurrentHashMap<String, Byte> old = 
topicCidMap.putIfAbsent(requestHeader.getTopic(), cids);
-            if (old != null) {
-                cids = old;
-            }
-        }
+        ConcurrentHashMap<String, Byte> cids = 
topicCidMap.get(requestHeader.getTopic(), key -> new ConcurrentHashMap<>());
         cids.putIfAbsent(requestHeader.getConsumerGroup(), Byte.MIN_VALUE);
         long expired = requestHeader.getBornTime() + 
requestHeader.getPollTime();
         final PopRequest request = new PopRequest(remotingCommand, ctx, 
expired, subscriptionData, messageFilter);
@@ -311,21 +312,13 @@ public class PopLongPollingService extends ServiceThread {
         }
         String key = KeyBuilder.buildPollingKey(requestHeader.getTopic(), 
requestHeader.getConsumerGroup(),
             requestHeader.getQueueId());
-        ConcurrentSkipListSet<PopRequest> queue = pollingMap.get(key);
-        if (queue == null) {
-            queue = new ConcurrentSkipListSet<>(PopRequest.COMPARATOR);
-            ConcurrentSkipListSet<PopRequest> old = 
pollingMap.putIfAbsent(key, queue);
-            if (old != null) {
-                queue = old;
-            }
-        } else {
-            // check size
-            int size = queue.size();
-            if (size > brokerController.getBrokerConfig().getPopPollingSize()) 
{
-                POP_LOGGER.info("polling {}, result POLLING_FULL, 
singleSize:{}", remotingCommand, size);
-                return POLLING_FULL;
-            }
+        ConcurrentSkipListSet<PopRequest> queue = pollingMap.get(key, k -> new 
ConcurrentSkipListSet<>(PopRequest.COMPARATOR));
+        int size = queue.size();
+        if (size > brokerController.getBrokerConfig().getPopPollingSize()) {
+            POP_LOGGER.info("polling {}, result POLLING_FULL, singleSize:{}", 
remotingCommand, size);
+            return POLLING_FULL;
         }
+
         if (queue.add(request)) {
             remotingCommand.setSuspended(true);
             totalPollingNum.incrementAndGet();
@@ -339,14 +332,18 @@ public class PopLongPollingService extends ServiceThread {
         }
     }
 
-    public ConcurrentLinkedHashMap<String, ConcurrentSkipListSet<PopRequest>> 
getPollingMap() {
+    public Cache<String, ConcurrentSkipListSet<PopRequest>> getPollingMap() {
         return pollingMap;
     }
 
+    public Cache<String, ConcurrentHashMap<String, Byte>> getTopicCidMap() {
+        return topicCidMap;
+    }
+
     private void cleanUnusedResource() {
         try {
             {
-                Iterator<Map.Entry<String, ConcurrentHashMap<String, Byte>>> 
topicCidMapIter = topicCidMap.entrySet().iterator();
+                Iterator<Map.Entry<String, ConcurrentHashMap<String, Byte>>> 
topicCidMapIter = topicCidMap.asMap().entrySet().iterator();
                 while (topicCidMapIter.hasNext()) {
                     Map.Entry<String, ConcurrentHashMap<String, Byte>> entry = 
topicCidMapIter.next();
                     String topic = entry.getKey();
@@ -368,7 +365,7 @@ public class PopLongPollingService extends ServiceThread {
             }
 
             {
-                Iterator<Map.Entry<String, ConcurrentSkipListSet<PopRequest>>> 
pollingMapIter = pollingMap.entrySet().iterator();
+                Iterator<Map.Entry<String, ConcurrentSkipListSet<PopRequest>>> 
pollingMapIter = pollingMap.asMap().entrySet().iterator();
                 while (pollingMapIter.hasNext()) {
                     Map.Entry<String, ConcurrentSkipListSet<PopRequest>> entry 
= pollingMapIter.next();
                     if (entry.getKey() == null) {
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PollingInfoProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PollingInfoProcessor.java
index f7baac144e..c114f4d4c3 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PollingInfoProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PollingInfoProcessor.java
@@ -106,7 +106,7 @@ public class PollingInfoProcessor implements 
NettyRequestProcessor {
             return response;
         }
         String key = KeyBuilder.buildPollingKey(requestHeader.getTopic(), 
requestHeader.getConsumerGroup(), requestHeader.getQueueId());
-        ConcurrentSkipListSet<PopRequest> queue = 
this.brokerController.getPopMessageProcessor().getPollingMap().get(key);
+        ConcurrentSkipListSet<PopRequest> queue = 
this.brokerController.getPopMessageProcessor().getPollingMap().getIfPresent(key);
         if (queue != null) {
             responseHeader.setPollingNum(queue.size());
         } else {
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
index 73f442bcd6..7d98705576 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopMessageProcessor.java
@@ -17,7 +17,7 @@
 package org.apache.rocketmq.broker.processor;
 
 import com.alibaba.fastjson.JSON;
-import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
+import com.github.benmanes.caffeine.cache.Cache;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
@@ -173,7 +173,7 @@ public class PopMessageProcessor implements 
NettyRequestProcessor {
         return false;
     }
 
-    public ConcurrentLinkedHashMap<String, ConcurrentSkipListSet<PopRequest>> 
getPollingMap() {
+    public Cache<String, ConcurrentSkipListSet<PopRequest>> getPollingMap() {
         return popLongPollingService.getPollingMap();
     }
 
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java
index 003bf09842..3547687a6d 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/longpolling/PopLongPollingServiceTest.java
@@ -16,9 +16,11 @@
  */
 package org.apache.rocketmq.broker.longpolling;
 
-import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
+import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.reflect.FieldUtils;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.BrokerConfig;
@@ -102,14 +104,18 @@ public class PopLongPollingServiceTest {
     public void testNotifyMessageArrivingValidRequest() throws Exception {
         String cid = "CID_1";
         int queueId = 0;
-        ConcurrentLinkedHashMap<String, ConcurrentHashMap<String, Byte>> 
topicCidMap = new ConcurrentLinkedHashMap.Builder<String, 
ConcurrentHashMap<String, Byte>>()
-            .maximumWeightedCapacity(10).build();
+        Cache<String, ConcurrentHashMap<String, Byte>> topicCidMap = 
Caffeine.newBuilder()
+            .maximumSize(10)
+            .expireAfterAccess(300, TimeUnit.SECONDS)
+            .build();
         ConcurrentHashMap<String, Byte> cids = new ConcurrentHashMap<>();
         cids.put(cid, (byte) 1);
         topicCidMap.put(defaultTopic, cids);
         popLongPollingService = new PopLongPollingService(brokerController, 
processor, true);
-        ConcurrentLinkedHashMap<String, ConcurrentSkipListSet<PopRequest>> 
pollingMap =
-            new ConcurrentLinkedHashMap.Builder<String, 
ConcurrentSkipListSet<PopRequest>>().maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize()).build();
+        Cache<String, ConcurrentSkipListSet<PopRequest>> pollingMap = 
Caffeine.newBuilder()
+            .maximumSize(10)
+            .expireAfterAccess(300, TimeUnit.SECONDS)
+            .build();
         Channel channel = mock(Channel.class);
         when(channel.isActive()).thenReturn(true);
         PopRequest popRequest = mock(PopRequest.class);
@@ -195,8 +201,10 @@ public class PopLongPollingServiceTest {
         when(requestHeader.getPollTime()).thenReturn(1000L);
         when(requestHeader.getTopic()).thenReturn(defaultTopic);
         when(requestHeader.getConsumerGroup()).thenReturn("defaultGroup");
-        ConcurrentLinkedHashMap<String, ConcurrentHashMap<String, Byte>> 
topicCidMap = new ConcurrentLinkedHashMap.Builder<String, 
ConcurrentHashMap<String, Byte>>()
-            .maximumWeightedCapacity(10).build();
+        Cache<String, ConcurrentHashMap<String, Byte>> topicCidMap = 
Caffeine.newBuilder()
+            .maximumSize(10)
+            .expireAfterAccess(300, TimeUnit.SECONDS)
+            .build();
         ConcurrentHashMap<String, Byte> cids = new ConcurrentHashMap<>();
         cids.put(cid, (byte) 1);
         topicCidMap.put(defaultTopic, cids);
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java 
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index cee64f623b..04828da64d 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -214,6 +214,8 @@ public class BrokerConfig extends BrokerIdentity {
 
     private int popPollingSize = 1024;
     private int popPollingMapSize = 100000;
+
+    private int popPollingMapExpireTimeSeconds = 60 * 10;
     // 20w cost 200M heap memory.
     private long maxPopPollingSize = 100000;
     private int reviveQueueNum = 8;
@@ -533,6 +535,14 @@ public class BrokerConfig extends BrokerIdentity {
         this.popPollingMapSize = popPollingMapSize;
     }
 
+    public int getPopPollingMapExpireTimeSeconds() {
+        return popPollingMapExpireTimeSeconds;
+    }
+
+    public void setPopPollingMapExpireTimeSeconds(int 
popPollingMapExpireTimeSeconds) {
+        this.popPollingMapExpireTimeSeconds = popPollingMapExpireTimeSeconds;
+    }
+
     public long getReviveScanTime() {
         return reviveScanTime;
     }

Reply via email to