This is an automated email from the ASF dual-hosted git repository.

scarb pushed a commit to branch 4.9.x
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/4.9.x by this push:
     new 11e049ca14 fix producer groupChannelTable memory leak (#8672)
11e049ca14 is described below

commit 11e049ca1448609cc6c733e323a060af573438e6
Author: yuz10 <[email protected]>
AuthorDate: Mon Sep 9 19:37:12 2024 +0800

    fix producer groupChannelTable memory leak (#8672)
---
 .../org/apache/rocketmq/broker/client/ProducerManager.java | 14 +++++++++++---
 1 file changed, 11 insertions(+), 3 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java 
b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
index e8f45e7023..91d4572309 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
@@ -78,8 +78,11 @@ public class ProducerManager {
     }
 
     public void scanNotActiveChannel() {
-        for (final Map.Entry<String, ConcurrentHashMap<Channel, 
ClientChannelInfo>> entry : this.groupChannelTable
-                .entrySet()) {
+        Iterator<Map.Entry<String, ConcurrentHashMap<Channel, 
ClientChannelInfo>>> iterator = this.groupChannelTable.entrySet().iterator();
+
+        while (iterator.hasNext()) {
+            Map.Entry<String, ConcurrentHashMap<Channel, ClientChannelInfo>> 
entry = iterator.next();
+
             final String group = entry.getKey();
             final ConcurrentHashMap<Channel, ClientChannelInfo> chlMap = 
entry.getValue();
 
@@ -94,11 +97,16 @@ public class ProducerManager {
                     it.remove();
                     clientChannelTable.remove(info.getClientId());
                     log.warn(
-                            "SCAN: remove expired channel[{}] from 
ProducerManager groupChannelTable, producer group name: {}",
+                            "ProducerManager#scanNotActiveChannel: remove 
expired channel[{}] from ProducerManager groupChannelTable, producer group 
name: {}",
                             
RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group);
                     RemotingUtil.closeChannel(info.getChannel());
                 }
             }
+
+            if (chlMap.isEmpty()) {
+                log.warn("SCAN: remove expired channel from ProducerManager 
groupChannelTable, all clear, group={}", group);
+                iterator.remove();
+            }
         }
     }
 

Reply via email to