This is an automated email from the ASF dual-hosted git repository. lizhimin pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push: new 921664f400 [ISSUE #9443] Fix add wrong value to topicGroupTable in ConsumerManager (#9444) 921664f400 is described below commit 921664f40063b431c767725deaad3dc53aa628b7 Author: yx9o <yangx_s...@163.com> AuthorDate: Tue Jun 3 10:02:13 2025 +0800 [ISSUE #9443] Fix add wrong value to topicGroupTable in ConsumerManager (#9444) --- .../rocketmq/broker/client/ConsumerManager.java | 2 +- .../broker/client/ConsumerManagerTest.java | 25 ++++++++++++++++++++-- 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java index 5aec8e577f..341bbb5dad 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java @@ -292,7 +292,7 @@ public class ConsumerManager { Set<String> prev = this.topicGroupTable.putIfAbsent(subscriptionData.getTopic(), tmp); groups = prev != null ? prev : tmp; } - groups.add(subscriptionData.getTopic()); + groups.add(group); } boolean updateChannelRst = consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, consumeFromWhere); diff --git a/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerTest.java index 2afd071b5e..1b8293159d 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/client/ConsumerManagerTest.java @@ -37,6 +37,7 @@ import org.mockito.junit.MockitoJUnitRunner; import java.util.HashSet; import java.util.Set; +import static org.apache.rocketmq.remoting.protocol.heartbeat.ConsumeType.CONSUME_PASSIVELY; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -115,7 +116,7 @@ public class ConsumerManagerTest { final Set<SubscriptionData> subList = new HashSet<>(); SubscriptionData subscriptionData = new SubscriptionData(TOPIC, "*"); subList.add(subscriptionData); - consumerManager.registerConsumer(GROUP, clientChannelInfo, ConsumeType.CONSUME_PASSIVELY, + consumerManager.registerConsumer(GROUP, clientChannelInfo, CONSUME_PASSIVELY, MessageModel.BROADCASTING, ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET, subList, true); verify(consumerManager, never()).callConsumerIdsChangeListener(eq(ConsumerGroupEvent.CHANGE), any(), any()); assertThat(consumerManager.getConsumerTable().get(GROUP)).isNotNull(); @@ -195,7 +196,7 @@ public class ConsumerManagerTest { final Set<SubscriptionData> subList = new HashSet<>(); SubscriptionData subscriptionData = new SubscriptionData(TOPIC, "*"); subList.add(subscriptionData); - consumerManager.registerConsumer(GROUP, clientChannelInfo, ConsumeType.CONSUME_PASSIVELY, + consumerManager.registerConsumer(GROUP, clientChannelInfo, CONSUME_PASSIVELY, MessageModel.BROADCASTING, ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET, subList, true); } @@ -210,4 +211,24 @@ public class ConsumerManagerTest { assertThat(consumerManager.findSubscriptionData(GROUP, TOPIC)).isNull(); assertThat(consumerManager.findSubscriptionData(GROUP, TOPIC + "_1")).isNotNull(); } + + @Test + public void testRegisterConsumerWithoutSub() { + ConsumerGroupInfo groupInfo = new ConsumerGroupInfo(GROUP, CONSUME_PASSIVELY, + MessageModel.CLUSTERING, ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); + SubscriptionData subscriptionData = new SubscriptionData(TOPIC, "*"); + groupInfo.getSubscriptionTable().put(TOPIC, subscriptionData); + consumerManager.getConsumerTable().put(GROUP, groupInfo); + + consumerManager.registerConsumerWithoutSub(GROUP, + clientChannelInfo, + CONSUME_PASSIVELY, + MessageModel.CLUSTERING, + ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET, + true); + + Set<String> actual = consumerManager.queryTopicConsumeByWho(TOPIC); + assertThat(actual).contains(GROUP); + assertThat(actual).doesNotContain(TOPIC); + } }