This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 881507e89f [ISSUE #9115] Optimize the broker's reverse notification
for consumerId change (#9116)
881507e89f is described below
commit 881507e89f6d9905f4ebf6a1b3f84fb02c031a39
Author: yx9o <[email protected]>
AuthorDate: Fri Apr 11 15:38:07 2025 +0800
[ISSUE #9115] Optimize the broker's reverse notification for consumerId
change (#9116)
* [ISSUE #9115] Optimize the broker's reverse notification for consumerId
change
---
.../client/DefaultConsumerIdsChangeListener.java | 45 +++++++++++++++++++++-
1 file changed, 44 insertions(+), 1 deletion(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
b/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
index d17a2a5470..e046176956 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/client/DefaultConsumerIdsChangeListener.java
@@ -23,6 +23,8 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.AbstractBrokerRunnable;
import org.apache.rocketmq.common.constant.LoggerName;
@@ -41,6 +43,8 @@ public class DefaultConsumerIdsChangeListener implements
ConsumerIdsChangeListen
private ConcurrentHashMap<String,List<Channel>> consumerChannelMap = new
ConcurrentHashMap<>(cacheSize);
+ private final ConcurrentHashMap<String, NotifyTaskControl>
activeGroupNotifyMap = new ConcurrentHashMap<>();
+
public DefaultConsumerIdsChangeListener(BrokerController brokerController)
{
this.brokerController = brokerController;
@@ -70,9 +74,25 @@ public class DefaultConsumerIdsChangeListener implements
ConsumerIdsChangeListen
List<Channel> channels = (List<Channel>) args[0];
if (channels != null &&
brokerController.getBrokerConfig().isNotifyConsumerIdsChangedEnable()) {
if
(this.brokerController.getBrokerConfig().isRealTimeNotifyConsumerChange()) {
- for (Channel chl : channels) {
+ NotifyTaskControl currentNotifyTaskControl = new
NotifyTaskControl(channels);
+ activeGroupNotifyMap.compute(group, (k, oldVal) -> {
+ if (null != oldVal) {
+ oldVal.interrupt();
+ }
+ return currentNotifyTaskControl;
+ });
+
+ boolean isNormalCompletion = true;
+ for (Channel chl :
currentNotifyTaskControl.getChannels()) {
+ if (currentNotifyTaskControl.isInterrupted()) {
+ isNormalCompletion = false;
+ break;
+ }
this.brokerController.getBroker2Client().notifyConsumerIdsChanged(chl, group);
}
+ if (isNormalCompletion) {
+ activeGroupNotifyMap.computeIfPresent(group, (k,
val) -> val == currentNotifyTaskControl ? null : val);
+ }
} else {
consumerChannelMap.put(group, channels);
}
@@ -125,4 +145,27 @@ public class DefaultConsumerIdsChangeListener implements
ConsumerIdsChangeListen
public void shutdown() {
this.scheduledExecutorService.shutdown();
}
+
+ private static class NotifyTaskControl {
+
+ private final AtomicBoolean interrupted = new AtomicBoolean(false);
+
+ private final List<Channel> channels;
+
+ public NotifyTaskControl(List<Channel> channels) {
+ this.channels = channels;
+ }
+
+ public boolean isInterrupted() {
+ return interrupted.get();
+ }
+
+ public void interrupt() {
+ interrupted.set(true);
+ }
+
+ public List<Channel> getChannels() {
+ return channels;
+ }
+ }
}