This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 881507e89f [ISSUE #9115] Optimize the broker's reverse notification 
for consumerId change (#9116)
881507e89f is described below

commit 881507e89f6d9905f4ebf6a1b3f84fb02c031a39
Author: yx9o <[email protected]>
AuthorDate: Fri Apr 11 15:38:07 2025 +0800

    [ISSUE #9115] Optimize the broker's reverse notification for consumerId 
change (#9116)
    
    * [ISSUE #9115] Optimize the broker's reverse notification for consumerId 
change
---
 .../client/DefaultConsumerIdsChangeListener.java   | 45 +++++++++++++++++++++-
 1 file changed, 44 insertions(+), 1 deletion(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
index d17a2a5470..e046176956 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
@@ -23,6 +23,8 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.common.AbstractBrokerRunnable;
 import org.apache.rocketmq.common.constant.LoggerName;
@@ -41,6 +43,8 @@ public class DefaultConsumerIdsChangeListener implements 
ConsumerIdsChangeListen
 
     private ConcurrentHashMap<String,List<Channel>> consumerChannelMap = new 
ConcurrentHashMap<>(cacheSize);
 
+    private final ConcurrentHashMap<String, NotifyTaskControl> 
activeGroupNotifyMap = new ConcurrentHashMap<>();
+
     public DefaultConsumerIdsChangeListener(BrokerController brokerController) 
{
         this.brokerController = brokerController;
 
@@ -70,9 +74,25 @@ public class DefaultConsumerIdsChangeListener implements 
ConsumerIdsChangeListen
                 List<Channel> channels = (List<Channel>) args[0];
                 if (channels != null && 
brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) {
                     if 
(this.brokerController.getBrokerConfig().isRealTimeNotifyConsumerChange()) {
-                        for (Channel chl : channels) {
+                        NotifyTaskControl currentNotifyTaskControl = new 
NotifyTaskControl(channels);
+                        activeGroupNotifyMap.compute(group, (k, oldVal) -> {
+                            if (null != oldVal) {
+                                oldVal.interrupt();
+                            }
+                            return currentNotifyTaskControl;
+                        });
+
+                        boolean isNormalCompletion = true;
+                        for (Channel chl : 
currentNotifyTaskControl.getChannels()) {
+                            if (currentNotifyTaskControl.isInterrupted()) {
+                                isNormalCompletion = false;
+                                break;
+                            }
                             
this.brokerController.getBroker2Client().notifyConsumerIdsChanged(chl, group);
                         }
+                        if (isNormalCompletion) {
+                            activeGroupNotifyMap.computeIfPresent(group, (k, 
val) -> val == currentNotifyTaskControl ? null : val);
+                        }
                     } else {
                         consumerChannelMap.put(group, channels);
                     }
@@ -125,4 +145,27 @@ public class DefaultConsumerIdsChangeListener implements 
ConsumerIdsChangeListen
     public void shutdown() {
         this.scheduledExecutorService.shutdown();
     }
+
+    private static class NotifyTaskControl {
+
+        private final AtomicBoolean interrupted = new AtomicBoolean(false);
+
+        private final List<Channel> channels;
+
+        public NotifyTaskControl(List<Channel> channels) {
+            this.channels = channels;
+        }
+
+        public boolean isInterrupted() {
+            return interrupted.get();
+        }
+
+        public void interrupt() {
+            interrupted.set(true);
+        }
+
+        public List<Channel> getChannels() {
+            return channels;
+        }
+    }
 }

Reply via email to