This is an automated email from the ASF dual-hosted git repository.
scarb pushed a commit to branch 4.9.x
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/4.9.x by this push:
new 11e049ca14 fix producer groupChannelTable memory leak (#8672)
11e049ca14 is described below
commit 11e049ca1448609cc6c733e323a060af573438e6
Author: yuz10 <[email protected]>
AuthorDate: Mon Sep 9 19:37:12 2024 +0800
fix producer groupChannelTable memory leak (#8672)
---
.../org/apache/rocketmq/broker/client/ProducerManager.java | 14 +++++++++++---
1 file changed, 11 insertions(+), 3 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
index e8f45e7023..91d4572309 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/client/ProducerManager.java
@@ -78,8 +78,11 @@ public class ProducerManager {
}
public void scanNotActiveChannel() {
- for (final Map.Entry<String, ConcurrentHashMap<Channel,
ClientChannelInfo>> entry : this.groupChannelTable
- .entrySet()) {
+ Iterator<Map.Entry<String, ConcurrentHashMap<Channel,
ClientChannelInfo>>> iterator = this.groupChannelTable.entrySet().iterator();
+
+ while (iterator.hasNext()) {
+ Map.Entry<String, ConcurrentHashMap<Channel, ClientChannelInfo>>
entry = iterator.next();
+
final String group = entry.getKey();
final ConcurrentHashMap<Channel, ClientChannelInfo> chlMap =
entry.getValue();
@@ -94,11 +97,16 @@ public class ProducerManager {
it.remove();
clientChannelTable.remove(info.getClientId());
log.warn(
- "SCAN: remove expired channel[{}] from
ProducerManager groupChannelTable, producer group name: {}",
+ "ProducerManager#scanNotActiveChannel: remove
expired channel[{}] from ProducerManager groupChannelTable, producer group
name: {}",
RemotingHelper.parseChannelRemoteAddr(info.getChannel()), group);
RemotingUtil.closeChannel(info.getChannel());
}
}
+
+ if (chlMap.isEmpty()) {
+ log.warn("SCAN: remove expired channel from ProducerManager
groupChannelTable, all clear, group={}", group);
+ iterator.remove();
+ }
}
}