This is an automated email from the ASF dual-hosted git repository.
zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 933ffc0b96 [ISSUE #8352] Fix CLIENT_REGISTER in registerConsumer
(#8353)
933ffc0b96 is described below
commit 933ffc0b968d39329e8379b1ddc2fe3575550fda
Author: Zhouxiang Zhan <[email protected]>
AuthorDate: Wed Jul 3 13:47:00 2024 +0800
[ISSUE #8352] Fix CLIENT_REGISTER in registerConsumer (#8353)
---
.../java/org/apache/rocketmq/broker/client/ConsumerManager.java | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
index 42e71e7e99..9f838b5154 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/client/ConsumerManager.java
@@ -178,8 +178,6 @@ public class ConsumerManager {
long start = System.currentTimeMillis();
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null == consumerGroupInfo) {
- callConsumerIdsChangeListener(ConsumerGroupEvent.CLIENT_REGISTER,
group, clientChannelInfo,
-
subList.stream().map(SubscriptionData::getTopic).collect(Collectors.toSet()));
ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType,
messageModel, consumeFromWhere);
ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group,
tmp);
consumerGroupInfo = prev != null ? prev : tmp;
@@ -188,6 +186,10 @@ public class ConsumerManager {
boolean r1 =
consumerGroupInfo.updateChannel(clientChannelInfo, consumeType,
messageModel,
consumeFromWhere);
+ if (r1) {
+ callConsumerIdsChangeListener(ConsumerGroupEvent.CLIENT_REGISTER,
group, clientChannelInfo,
+
subList.stream().map(SubscriptionData::getTopic).collect(Collectors.toSet()));
+ }
boolean r2 = false;
if (updateSubscription) {
r2 = consumerGroupInfo.updateSubscription(subList);