This is an automated email from the ASF dual-hosted git repository.

lizhimins pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 976e9a3edd [ISSUE #10435] Fix shared ClientChannelInfo preventing 
inactive channel expiry (#10437)
976e9a3edd is described below

commit 976e9a3edd591e28e190b8eedfbd531cb8fe8997
Author: Chuan <[email protected]>
AuthorDate: Thu Jun 11 15:36:39 2026 +0800

    [ISSUE #10435] Fix shared ClientChannelInfo preventing inactive channel 
expiry (#10437)
---
 .../broker/processor/ClientManageProcessor.java    | 25 +++++++---
 .../processor/ClientManageProcessorTest.java       | 57 ++++++++++++++++++++++
 2 files changed, 75 insertions(+), 7 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
index b51967e184..cc1f42e4e3 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/ClientManageProcessor.java
@@ -78,14 +78,14 @@ public class ClientManageProcessor implements 
NettyRequestProcessor {
     public RemotingCommand heartBeat(ChannelHandlerContext ctx, 
RemotingCommand request) {
         RemotingCommand response = RemotingCommand.createResponseCommand(null);
         HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), 
HeartbeatData.class);
-        ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
-            ctx.channel(),
-            heartbeatData.getClientID(),
-            request.getLanguage(),
-            request.getVersion()
-        );
         int heartbeatFingerprint = heartbeatData.getHeartbeatFingerprint();
         if (heartbeatFingerprint != 0) {
+            ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
+                ctx.channel(),
+                heartbeatData.getClientID(),
+                request.getLanguage(),
+                request.getVersion()
+            );
             return heartBeatV2(ctx, heartbeatData, clientChannelInfo, 
response);
         }
         for (ConsumerData consumerData : heartbeatData.getConsumerDataSet()) {
@@ -122,6 +122,12 @@ public class ClientManageProcessor implements 
NettyRequestProcessor {
             
this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
 subscriptionGroupConfig.getRetryQueueNums(),
                 PermName.PERM_WRITE | PermName.PERM_READ, hasOrderTopicSub, 
topicSysFlag);
 
+            ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
+                ctx.channel(),
+                heartbeatData.getClientID(),
+                request.getLanguage(),
+                request.getVersion()
+            );
             boolean changed = 
this.brokerController.getConsumerManager().registerConsumer(
                 consumerData.getGroupName(),
                 clientChannelInfo,
@@ -140,7 +146,12 @@ public class ClientManageProcessor implements 
NettyRequestProcessor {
 
         for (ProducerData data : heartbeatData.getProducerDataSet()) {
             
this.brokerController.getProducerManager().registerProducer(data.getGroupName(),
-                clientChannelInfo);
+                new ClientChannelInfo(
+                    ctx.channel(),
+                    heartbeatData.getClientID(),
+                    request.getLanguage(),
+                    request.getVersion()
+                ));
         }
         response.setCode(ResponseCode.SUCCESS);
         response.setRemark(null);
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/ClientManageProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/ClientManageProcessorTest.java
index 874adb4d5f..ec7164247e 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/ClientManageProcessorTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/ClientManageProcessorTest.java
@@ -218,6 +218,63 @@ public class ClientManageProcessorTest {
         return heartbeatData;
     }
 
+    @Test
+    public void testHeartbeatMultiGroupChannelIndependentExpiry() throws 
RemotingCommandException {
+        String groupA = "GroupA";
+        String groupB = "GroupB";
+
+        // Send heartbeat containing two consumer groups
+        RemotingCommand request = createMultiGroupHeartbeatCommand(groupA, 
groupB);
+        RemotingCommand response = 
clientManageProcessor.processRequest(handlerContext, request);
+        assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
+
+        ConsumerGroupInfo groupAInfo = 
brokerController.getConsumerManager().getConsumerGroupInfo(groupA);
+        ConsumerGroupInfo groupBInfo = 
brokerController.getConsumerManager().getConsumerGroupInfo(groupB);
+        assertThat(groupAInfo).isNotNull();
+        assertThat(groupBInfo).isNotNull();
+
+        // Each group must hold an independent ClientChannelInfo instance
+        ClientChannelInfo channelInfoInA = 
groupAInfo.getChannelInfoTable().get(channel);
+        ClientChannelInfo channelInfoInB = 
groupBInfo.getChannelInfoTable().get(channel);
+        assertThat(channelInfoInA).isNotSameAs(channelInfoInB);
+
+        // Simulate time exceeding the expiry threshold
+        long expiredTs = System.currentTimeMillis()
+            - brokerController.getBrokerConfig().getChannelExpiredTimeout() * 
2;
+        channelInfoInA.setLastUpdateTimestamp(expiredTs);
+        channelInfoInB.setLastUpdateTimestamp(expiredTs);
+
+        // Only groupA sends a subsequent heartbeat
+        RemotingCommand heartbeatOnlyA = 
createMultiGroupHeartbeatCommand(groupA);
+        clientManageProcessor.processRequest(handlerContext, heartbeatOnlyA);
+
+        brokerController.getConsumerManager().scanNotActiveChannel();
+
+        // groupA should survive (just sent heartbeat)
+        
assertThat(brokerController.getConsumerManager().getConsumerGroupInfo(groupA)).isNotNull();
+        // groupB should be evicted (no heartbeat renewal)
+        
assertThat(brokerController.getConsumerManager().getConsumerGroupInfo(groupB)).isNull();
+    }
+
+    private RemotingCommand createMultiGroupHeartbeatCommand(String... groups) 
{
+        HeartbeatData heartbeatData = new HeartbeatData();
+        heartbeatData.setClientID(clientId);
+        for (String g : groups) {
+            ConsumerData consumerData = createConsumerData(g);
+            SubscriptionData sub = new SubscriptionData();
+            sub.setTopic(topic);
+            sub.setSubString("*");
+            sub.setSubVersion(100L);
+            consumerData.getSubscriptionDataSet().add(sub);
+            heartbeatData.getConsumerDataSet().add(consumerData);
+        }
+        RemotingCommand request = 
RemotingCommand.createRequestCommand(RequestCode.HEART_BEAT, null);
+        request.setLanguage(LanguageCode.JAVA);
+        request.setVersion(100);
+        request.setBody(heartbeatData.encode());
+        return request;
+    }
+
     static ConsumerData createConsumerData(String group) {
         ConsumerData consumerData = new ConsumerData();
         consumerData.setGroupName(group);

Reply via email to