This is an automated email from the ASF dual-hosted git repository.
lizhimins pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 976e9a3edd [ISSUE #10435] Fix shared ClientChannelInfo preventing
inactive channel expiry (#10437)
976e9a3edd is described below
commit 976e9a3edd591e28e190b8eedfbd531cb8fe8997
Author: Chuan <[email protected]>
AuthorDate: Thu Jun 11 15:36:39 2026 +0800
[ISSUE #10435] Fix shared ClientChannelInfo preventing inactive channel
expiry (#10437)
---
.../broker/processor/ClientManageProcessor.java | 25 +++++++---
.../processor/ClientManageProcessorTest.java | 57 ++++++++++++++++++++++
2 files changed, 75 insertions(+), 7 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
index b51967e184..cc1f42e4e3 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
@@ -78,14 +78,14 @@ public class ClientManageProcessor implements
NettyRequestProcessor {
public RemotingCommand heartBeat(ChannelHandlerContext ctx,
RemotingCommand request) {
RemotingCommand response = RemotingCommand.createResponseCommand(null);
HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(),
HeartbeatData.class);
- ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
- ctx.channel(),
- heartbeatData.getClientID(),
- request.getLanguage(),
- request.getVersion()
- );
int heartbeatFingerprint = heartbeatData.getHeartbeatFingerprint();
if (heartbeatFingerprint != 0) {
+ ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
+ ctx.channel(),
+ heartbeatData.getClientID(),
+ request.getLanguage(),
+ request.getVersion()
+ );
return heartBeatV2(ctx, heartbeatData, clientChannelInfo,
response);
}
for (ConsumerData consumerData : heartbeatData.getConsumerDataSet()) {
@@ -122,6 +122,12 @@ public class ClientManageProcessor implements
NettyRequestProcessor {
this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
subscriptionGroupConfig.getRetryQueueNums(),
PermName.PERM_WRITE | PermName.PERM_READ, hasOrderTopicSub,
topicSysFlag);
+ ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
+ ctx.channel(),
+ heartbeatData.getClientID(),
+ request.getLanguage(),
+ request.getVersion()
+ );
boolean changed =
this.brokerController.getConsumerManager().registerConsumer(
consumerData.getGroupName(),
clientChannelInfo,
@@ -140,7 +146,12 @@ public class ClientManageProcessor implements
NettyRequestProcessor {
for (ProducerData data : heartbeatData.getProducerDataSet()) {
this.brokerController.getProducerManager().registerProducer(data.getGroupName(),
- clientChannelInfo);
+ new ClientChannelInfo(
+ ctx.channel(),
+ heartbeatData.getClientID(),
+ request.getLanguage(),
+ request.getVersion()
+ ));
}
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/ClientManageProcessorTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/ClientManageProcessorTest.java
index 874adb4d5f..ec7164247e 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/ClientManageProcessorTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/ClientManageProcessorTest.java
@@ -218,6 +218,63 @@ public class ClientManageProcessorTest {
return heartbeatData;
}
+ @Test
+ public void testHeartbeatMultiGroupChannelIndependentExpiry() throws
RemotingCommandException {
+ String groupA = "GroupA";
+ String groupB = "GroupB";
+
+ // Send heartbeat containing two consumer groups
+ RemotingCommand request = createMultiGroupHeartbeatCommand(groupA,
groupB);
+ RemotingCommand response =
clientManageProcessor.processRequest(handlerContext, request);
+ assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+ ConsumerGroupInfo groupAInfo =
brokerController.getConsumerManager().getConsumerGroupInfo(groupA);
+ ConsumerGroupInfo groupBInfo =
brokerController.getConsumerManager().getConsumerGroupInfo(groupB);
+ assertThat(groupAInfo).isNotNull();
+ assertThat(groupBInfo).isNotNull();
+
+ // Each group must hold an independent ClientChannelInfo instance
+ ClientChannelInfo channelInfoInA =
groupAInfo.getChannelInfoTable().get(channel);
+ ClientChannelInfo channelInfoInB =
groupBInfo.getChannelInfoTable().get(channel);
+ assertThat(channelInfoInA).isNotSameAs(channelInfoInB);
+
+ // Simulate time exceeding the expiry threshold
+ long expiredTs = System.currentTimeMillis()
+ - brokerController.getBrokerConfig().getChannelExpiredTimeout() *
2;
+ channelInfoInA.setLastUpdateTimestamp(expiredTs);
+ channelInfoInB.setLastUpdateTimestamp(expiredTs);
+
+ // Only groupA sends a subsequent heartbeat
+ RemotingCommand heartbeatOnlyA =
createMultiGroupHeartbeatCommand(groupA);
+ clientManageProcessor.processRequest(handlerContext, heartbeatOnlyA);
+
+ brokerController.getConsumerManager().scanNotActiveChannel();
+
+ // groupA should survive (just sent heartbeat)
+
assertThat(brokerController.getConsumerManager().getConsumerGroupInfo(groupA)).isNotNull();
+ // groupB should be evicted (no heartbeat renewal)
+
assertThat(brokerController.getConsumerManager().getConsumerGroupInfo(groupB)).isNull();
+ }
+
+ private RemotingCommand createMultiGroupHeartbeatCommand(String... groups)
{
+ HeartbeatData heartbeatData = new HeartbeatData();
+ heartbeatData.setClientID(clientId);
+ for (String g : groups) {
+ ConsumerData consumerData = createConsumerData(g);
+ SubscriptionData sub = new SubscriptionData();
+ sub.setTopic(topic);
+ sub.setSubString("*");
+ sub.setSubVersion(100L);
+ consumerData.getSubscriptionDataSet().add(sub);
+ heartbeatData.getConsumerDataSet().add(consumerData);
+ }
+ RemotingCommand request =
RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
+ request.setLanguage(LanguageCode.JAVA);
+ request.setVersion(100);
+ request.setBody(heartbeatData.encode());
+ return request;
+ }
+
static ConsumerData createConsumerData(String group) {
ConsumerData consumerData = new ConsumerData();
consumerData.setGroupName(group);