This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 7acfe4dca1 [ISSUE #9737] Fix client memory leak on connection failure
(#9740)
7acfe4dca1 is described below
commit 7acfe4dca1ac38c79fbe78fc7e6f0e76fbeb21df
Author: qianye <[email protected]>
AuthorDate: Wed Nov 19 16:22:07 2025 +0800
[ISSUE #9737] Fix client memory leak on connection failure (#9740)
---
.../remoting/netty/NettyRemotingClient.java | 34 ++++++++++++----------
1 file changed, 18 insertions(+), 16 deletions(-)
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 973d229bef..7ed977d99c 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -40,6 +40,7 @@ import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.handler.timeout.IdleStateHandler;
import io.netty.resolver.NoopAddressResolverGroup;
+import io.netty.util.AttributeKey;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
@@ -96,6 +97,9 @@ import static
org.apache.rocketmq.remoting.common.RemotingHelper.convertChannelF
public class NettyRemotingClient extends NettyRemotingAbstract implements
RemotingClient {
private static final Logger LOGGER =
LoggerFactory.getLogger(LoggerName.ROCKETMQ_REMOTING_NAME);
+ private static final AttributeKey<ChannelWrapper>
CHANNEL_WRAPPER_ATTRIBUTE_KEY = AttributeKey.valueOf(
+ "channelWrapper");
+
private static final long LOCK_TIMEOUT_MILLIS = 3000;
private static final long MIN_CLOSE_TIMEOUT_MILLIS = 100;
@@ -106,7 +110,6 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
private final Map<String /* cidr */, SocksProxyConfig /* proxy */>
proxyMap = new HashMap<>();
private final ConcurrentHashMap<String /* cidr */, Bootstrap> bootstrapMap
= new ConcurrentHashMap<>();
private final ConcurrentMap<String /* addr */, ChannelWrapper>
channelTables = new ConcurrentHashMap<>();
- private final ConcurrentMap<Channel, ChannelWrapper> channelWrapperTables
= new ConcurrentHashMap<>();
private final HashedWheelTimer timer = new HashedWheelTimer(r -> new
Thread(r, "ClientHouseKeepingService"));
@@ -381,7 +384,6 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
channel.getValue().close();
}
- this.channelWrapperTables.clear();
this.channelTables.clear();
this.eventLoopGroupWorker.shutdownGracefully();
@@ -439,7 +441,8 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
}
if (removeItemFromTable) {
- ChannelWrapper channelWrapper =
this.channelWrapperTables.remove(channel);
+ ChannelWrapper channelWrapper =
+
RemotingHelper.getAttributeValue(CHANNEL_WRAPPER_ATTRIBUTE_KEY, channel);
if (channelWrapper != null &&
channelWrapper.tryClose(channel)) {
this.channelTables.remove(addrRemote);
}
@@ -487,7 +490,8 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
}
if (removeItemFromTable) {
- ChannelWrapper channelWrapper =
this.channelWrapperTables.remove(channel);
+ ChannelWrapper channelWrapper =
+
RemotingHelper.getAttributeValue(CHANNEL_WRAPPER_ATTRIBUTE_KEY, channel);
if (channelWrapper != null &&
channelWrapper.tryClose(channel)) {
this.channelTables.remove(addrRemote);
}
@@ -724,7 +728,6 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
LOGGER.info("createChannel: begin to connect remote host[{}]
asynchronously", addr);
ChannelWrapper cw = new ChannelWrapper(addr, channelFuture);
this.channelTables.put(addr, cw);
- this.channelWrapperTables.put(channelFuture.channel(), cw);
return cw;
}
@@ -831,17 +834,12 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
if (response.getCode() == ResponseCode.GO_AWAY) {
if (nettyClientConfig.isEnableReconnectForGoAway()) {
LOGGER.info("Receive go away from channelId={},
channel={}", channel.id(), channel);
- ChannelWrapper channelWrapper =
channelWrapperTables.computeIfPresent(channel, (channel0, channelWrapper0) -> {
- try {
- if (channelWrapper0.reconnect(channel0)) {
- LOGGER.info("Receive go away from
channelId={}, channel={}, recreate the channelId={}", channel0.id(), channel0,
channelWrapper0.getChannel().id());
-
channelWrapperTables.put(channelWrapper0.getChannel(), channelWrapper0);
- }
- } catch (Throwable t) {
- LOGGER.error("Channel {} reconnect error",
channelWrapper0, t);
- }
- return channelWrapper0;
- });
+ ChannelWrapper channelWrapper =
RemotingHelper.getAttributeValue(CHANNEL_WRAPPER_ATTRIBUTE_KEY,
+ channel);
+ if (channelWrapper != null &&
channelWrapper.reconnect(channel)) {
+ LOGGER.info("Receive go away from channelId={},
channel={}, recreate the channelId={}",
+ channel.id(), channel,
channelWrapper.getChannel().id());
+ }
if (channelWrapper != null &&
!channelWrapper.isWrapperOf(channel)) {
RemotingCommand retryRequest =
RemotingCommand.createRequestCommand(request.getCode(),
request.readCustomHeader());
retryRequest.setBody(request.getBody());
@@ -1006,6 +1004,7 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
this.channelFuture = channelFuture;
this.lastResponseTime = System.currentTimeMillis();
this.channelAddress = address;
+ RemotingHelper.setPropertyToAttr(channelFuture.channel(),
CHANNEL_WRAPPER_ATTRIBUTE_KEY, this);
}
public boolean isOK() {
@@ -1055,10 +1054,13 @@ public class NettyRemotingClient extends
NettyRemotingAbstract implements Remoti
if (isWrapperOf(channel)) {
channelToClose = channelFuture;
channelFuture = doConnect(channelAddress);
+
RemotingHelper.setPropertyToAttr(channelFuture.channel(),
CHANNEL_WRAPPER_ATTRIBUTE_KEY, this);
return true;
} else {
LOGGER.warn("channelWrapper has reconnect, so do
nothing, now channelId={}, input channelId={}",getChannel().id(), channel.id());
}
+ } catch (Throwable t) {
+ LOGGER.error("ChannelWrapper {} reconnect error", this, t);
} finally {
lock.writeLock().unlock();
}