This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 933ffc0b96 [ISSUE #8352] Fix CLIENT_REGISTER in registerConsumer 
(#8353)
933ffc0b96 is described below

commit 933ffc0b968d39329e8379b1ddc2fe3575550fda
Author: Zhouxiang Zhan <[email protected]>
AuthorDate: Wed Jul 3 13:47:00 2024 +0800

    [ISSUE #8352] Fix CLIENT_REGISTER in registerConsumer (#8353)
---
 .../java/org/apache/rocketmq/broker/client/ConsumerManager.java     | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java 
b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
index 42e71e7e99..9f838b5154 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
@@ -178,8 +178,6 @@ public class ConsumerManager {
         long start = System.currentTimeMillis();
         ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
         if (null == consumerGroupInfo) {
-            callConsumerIdsChangeListener(ConsumerGroupEvent.CLIENT_REGISTER, 
group, clientChannelInfo,
-                
subList.stream().map(SubscriptionData::getTopic).collect(Collectors.toSet()));
             ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, 
messageModel, consumeFromWhere);
             ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, 
tmp);
             consumerGroupInfo = prev != null ? prev : tmp;
@@ -188,6 +186,10 @@ public class ConsumerManager {
         boolean r1 =
             consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, 
messageModel,
                 consumeFromWhere);
+        if (r1) {
+            callConsumerIdsChangeListener(ConsumerGroupEvent.CLIENT_REGISTER, 
group, clientChannelInfo,
+                
subList.stream().map(SubscriptionData::getTopic).collect(Collectors.toSet()));
+        }
         boolean r2 = false;
         if (updateSubscription) {
             r2 = consumerGroupInfo.updateSubscription(subList);

Reply via email to