This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 921664f400 [ISSUE #9443] Fix add wrong value to topicGroupTable in 
ConsumerManager (#9444)
921664f400 is described below

commit 921664f40063b431c767725deaad3dc53aa628b7
Author: yx9o <yangx_s...@163.com>
AuthorDate: Tue Jun 3 10:02:13 2025 +0800

    [ISSUE #9443] Fix add wrong value to topicGroupTable in ConsumerManager 
(#9444)
---
 .../rocketmq/broker/client/ConsumerManager.java    |  2 +-
 .../broker/client/ConsumerManagerTest.java         | 25 ++++++++++++++++++++--
 2 files changed, 24 insertions(+), 3 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java 
b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
index 5aec8e577f..341bbb5dad 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
@@ -292,7 +292,7 @@ public class ConsumerManager {
                 Set<String> prev = 
this.topicGroupTable.putIfAbsent(subscriptionData.getTopic(), tmp);
                 groups = prev != null ? prev : tmp;
             }
-            groups.add(subscriptionData.getTopic());
+            groups.add(group);
         }
 
         boolean updateChannelRst = 
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, 
consumeFromWhere);
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerTest.java
index 2afd071b5e..1b8293159d 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerTest.java
@@ -37,6 +37,7 @@ import org.mockito.junit.MockitoJUnitRunner;
 import java.util.HashSet;
 import java.util.Set;
 
+import static 
org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType.CONSUME_PASSIVELY;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -115,7 +116,7 @@ public class ConsumerManagerTest {
         final Set<SubscriptionData> subList = new HashSet<>();
         SubscriptionData subscriptionData = new SubscriptionData(TOPIC, "*");
         subList.add(subscriptionData);
-        consumerManager.registerConsumer(GROUP, clientChannelInfo, 
ConsumeType.CONSUME_PASSIVELY,
+        consumerManager.registerConsumer(GROUP, clientChannelInfo, 
CONSUME_PASSIVELY,
             MessageModel.BROADCASTING, 
ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET, subList, true);
         verify(consumerManager, 
never()).callConsumerIdsChangeListener(eq(ConsumerGroupEvent.CHANGE), any(), 
any());
         assertThat(consumerManager.getConsumerTable().get(GROUP)).isNotNull();
@@ -195,7 +196,7 @@ public class ConsumerManagerTest {
         final Set<SubscriptionData> subList = new HashSet<>();
         SubscriptionData subscriptionData = new SubscriptionData(TOPIC, "*");
         subList.add(subscriptionData);
-        consumerManager.registerConsumer(GROUP, clientChannelInfo, 
ConsumeType.CONSUME_PASSIVELY,
+        consumerManager.registerConsumer(GROUP, clientChannelInfo, 
CONSUME_PASSIVELY,
             MessageModel.BROADCASTING, 
ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET, subList, true);
     }
 
@@ -210,4 +211,24 @@ public class ConsumerManagerTest {
         assertThat(consumerManager.findSubscriptionData(GROUP, 
TOPIC)).isNull();
         assertThat(consumerManager.findSubscriptionData(GROUP, TOPIC + 
"_1")).isNotNull();
     }
+    
+    @Test
+    public void testRegisterConsumerWithoutSub() {
+        ConsumerGroupInfo groupInfo = new ConsumerGroupInfo(GROUP, 
CONSUME_PASSIVELY,
+                MessageModel.CLUSTERING, 
ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+        SubscriptionData subscriptionData = new SubscriptionData(TOPIC, "*");
+        groupInfo.getSubscriptionTable().put(TOPIC, subscriptionData);
+        consumerManager.getConsumerTable().put(GROUP, groupInfo);
+        
+        consumerManager.registerConsumerWithoutSub(GROUP,
+                clientChannelInfo,
+                CONSUME_PASSIVELY,
+                MessageModel.CLUSTERING,
+                ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET,
+                true);
+        
+        Set<String> actual = consumerManager.queryTopicConsumeByWho(TOPIC);
+        assertThat(actual).contains(GROUP);
+        assertThat(actual).doesNotContain(TOPIC);
+    }
 }

Reply via email to