This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new b6cc188bfa [ISSUE #9992] Fix remoting server netty server codec thread
reuse problem (#9993)
b6cc188bfa is described below
commit b6cc188bfa5cd35435a34f13f8b3760684c1a8cc
Author: ymwneu <[email protected]>
AuthorDate: Wed Jan 14 10:06:22 2026 +0800
[ISSUE #9992] Fix remoting server netty server codec thread reuse problem
(#9993)
Co-authored-by: maowei.ymw <[email protected]>
---
.../rocketmq/remoting/netty/NettyRemotingServer.java | 18 +++++++++---------
1 file changed, 9 insertions(+), 9 deletions(-)
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index d56d6faa33..be02d0f9a9 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -272,9 +272,9 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
*/
protected ChannelPipeline configChannel(SocketChannel ch) {
return ch.pipeline()
- .addLast(nettyServerConfig.isServerNettyWorkerGroupEnable() ?
defaultEventExecutorGroup : null,
+ .addLast(getDefaultEventExecutorGroup(),
HANDSHAKE_HANDLER_NAME, new HandshakeHandler())
- .addLast(nettyServerConfig.isServerNettyWorkerGroupEnable() ?
defaultEventExecutorGroup : null,
+ .addLast(getDefaultEventExecutorGroup(),
encoder,
new NettyDecoder(),
distributionHandler,
@@ -430,7 +430,7 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
}
public DefaultEventExecutorGroup getDefaultEventExecutorGroup() {
- return defaultEventExecutorGroup;
+ return nettyServerConfig.isServerNettyWorkerGroupEnable() ?
defaultEventExecutorGroup : null;
}
public NettyEncoder getEncoder() {
@@ -462,11 +462,11 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
return;
}
if (detectionResult.state() ==
ProtocolDetectionState.DETECTED) {
- ctx.pipeline().addAfter(defaultEventExecutorGroup,
ctx.name(), HA_PROXY_DECODER, new HAProxyMessageDecoder())
- .addAfter(defaultEventExecutorGroup, HA_PROXY_DECODER,
HA_PROXY_HANDLER, new HAProxyMessageHandler())
- .addAfter(defaultEventExecutorGroup, HA_PROXY_HANDLER,
TLS_MODE_HANDLER, tlsModeHandler);
+ ctx.pipeline().addAfter(getDefaultEventExecutorGroup(),
ctx.name(), HA_PROXY_DECODER, new HAProxyMessageDecoder())
+ .addAfter(getDefaultEventExecutorGroup(),
HA_PROXY_DECODER, HA_PROXY_HANDLER, new HAProxyMessageHandler())
+ .addAfter(getDefaultEventExecutorGroup(),
HA_PROXY_HANDLER, TLS_MODE_HANDLER, tlsModeHandler);
} else {
- ctx.pipeline().addAfter(defaultEventExecutorGroup,
ctx.name(), TLS_MODE_HANDLER, tlsModeHandler);
+ ctx.pipeline().addAfter(getDefaultEventExecutorGroup(),
ctx.name(), TLS_MODE_HANDLER, tlsModeHandler);
}
try {
@@ -509,8 +509,8 @@ public class NettyRemotingServer extends
NettyRemotingAbstract implements Remoti
case ENFORCING:
if (null != sslContext) {
ctx.pipeline()
- .addAfter(defaultEventExecutorGroup,
TLS_MODE_HANDLER, TLS_HANDLER_NAME,
sslContext.newHandler(ctx.channel().alloc()))
- .addAfter(defaultEventExecutorGroup,
TLS_HANDLER_NAME, FILE_REGION_ENCODER_NAME, new FileRegionEncoder());
+ .addAfter(getDefaultEventExecutorGroup(),
TLS_MODE_HANDLER, TLS_HANDLER_NAME,
sslContext.newHandler(ctx.channel().alloc()))
+ .addAfter(getDefaultEventExecutorGroup(),
TLS_HANDLER_NAME, FILE_REGION_ENCODER_NAME, new FileRegionEncoder());
log.info("Handlers prepended to channel pipeline
to establish SSL connection");
} else {
ctx.close();