This is an automated email from the ASF dual-hosted git repository.

lizhimins pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 2a9560cc98 [ISSUE #10387] Support runtime hot-reload of 
maxClientEventCount in LiteEventDispatcher.ClientEventSet (#10388)
2a9560cc98 is described below

commit 2a9560cc986b4bf9aca23c38e8bb154c5159159c
Author: Quan <[email protected]>
AuthorDate: Tue Jun 2 20:10:44 2026 +0800

    [ISSUE #10387] Support runtime hot-reload of maxClientEventCount in 
LiteEventDispatcher.ClientEventSet (#10388)
---
 .../rocketmq/broker/lite/LiteEventDispatcher.java  | 34 +++++++++++++++++++---
 .../broker/lite/LiteEventDispatcherTest.java       |  6 ++--
 .../org/apache/rocketmq/common/BrokerConfig.java   | 10 +++++++
 3 files changed, 43 insertions(+), 7 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/lite/LiteEventDispatcher.java 
b/broker/src/main/java/org/apache/rocketmq/broker/lite/LiteEventDispatcher.java
index 8bdb2879df..1267856da6 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/lite/LiteEventDispatcher.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/lite/LiteEventDispatcher.java
@@ -24,6 +24,7 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.tuple.Triple;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
+import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.ServiceThread;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.entity.ClientGroup;
@@ -445,15 +446,36 @@ public class LiteEventDispatcher extends ServiceThread {
         private final String group;
         private volatile long lastAccessTime = System.currentTimeMillis();
         private volatile long lastConsumeTime = System.currentTimeMillis();
+        /**
+         * Cache resolved max capacity to avoid per-offer 
SubscriptionGroupConfig lookup + attribute
+         * parsing on the hot dispatch path. Soft-cap semantics tolerate a 
short staleness window,
+         * so refresh lazily by TTL {@link 
BrokerConfig#getLiteEventCapacityCacheTtlMs()}.
+         */
+        private volatile int maxCapacityCache;
+        private volatile long capacityRefreshTime = System.currentTimeMillis();
 
         public ClientEventSet(String group) {
             this.group = group;
-            events = new 
LinkedBlockingQueue<>(LiteMetadataUtil.getMaxClientEventCount(group, 
brokerController));
+            // Use a large bounded queue as a hard ceiling; the effective 
capacity is enforced
+            // dynamically via soft-cap in offer() so that maxClientEventCount 
can be changed
+            // at runtime without restart.
+            this.events = new LinkedBlockingQueue<>(100_000);
+            this.maxCapacityCache = 
LiteMetadataUtil.getMaxClientEventCount(group, brokerController);
+        }
+
+        private int getMaxCapacity() {
+            long now = System.currentTimeMillis();
+            long ttl = 
brokerController.getBrokerConfig().getLiteEventCapacityCacheTtlMs();
+            if (now - capacityRefreshTime > ttl) {
+                maxCapacityCache = 
LiteMetadataUtil.getMaxClientEventCount(group, brokerController);
+                capacityRefreshTime = now;
+            }
+            return maxCapacityCache;
         }
 
         // return false if and only if the queue is full, has race condition 
with poll(), but no side effect.
         public boolean offer(String event) {
-            if (events.remainingCapacity() == 0) {
+            if (events.size() >= getMaxCapacity()) {
                 return false;
             }
             boolean rst;
@@ -486,7 +508,8 @@ public class LiteEventDispatcher extends ServiceThread {
 
         public boolean isLowWaterMark() {
             int used = events.size();
-            return (double) used / (used + events.remainingCapacity()) < 
LOW_WATER_MARK;
+            int maxCapacity = getMaxCapacity();
+            return maxCapacity <= 0 || (double) used / maxCapacity < 
LOW_WATER_MARK;
         }
 
         public boolean isActiveConsuming() {
@@ -516,7 +539,7 @@ public class LiteEventDispatcher extends ServiceThread {
         }
 
         /**
-         *  Mostly triggered when client channel closed, ensure that lite 
subscriptions is cleared before.
+         * Mostly triggered when client channel closed, ensure that lite 
subscriptions is cleared before.
          */
         @Override
         public void onRemoveAll(String clientId, String group) {
@@ -553,10 +576,12 @@ public class LiteEventDispatcher extends ServiceThread {
     static class LiteSubscriptionIterator implements Iterator<String> {
         private final Iterator<String> iterator;
         private final String parentTopic;
+
         public LiteSubscriptionIterator(String parentTopic, Iterator<String> 
iterator) {
             this.parentTopic = parentTopic;
             this.iterator = iterator;
         }
+
         @Override
         public boolean hasNext() {
             return iterator.hasNext();
@@ -572,6 +597,7 @@ public class LiteEventDispatcher extends ServiceThread {
         private final String clientId;
         private final String group;
         private final long timestamp;
+
         public FullDispatchRequest(String clientId, String group, long 
delayMillis) {
             this.clientId = clientId;
             this.group = group;
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/lite/LiteEventDispatcherTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/lite/LiteEventDispatcherTest.java
index 31d5562f92..36e4ae2378 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/lite/LiteEventDispatcherTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/lite/LiteEventDispatcherTest.java
@@ -580,9 +580,9 @@ public class LiteEventDispatcherTest {
         when(consumerOffsetManager.queryOffset(group, lmqName, 
0)).thenReturn(50L);
 
         LiteEventDispatcher.ClientEventSet eventSet = 
spy(liteEventDispatcher.new ClientEventSet(group));
-        when(eventSet.maybeBlock()).thenReturn(false);
-        when(eventSet.isLowWaterMark()).thenReturn(true);
-        when(eventSet.offer(lmqName)).thenReturn(true);
+        doReturn(false).when(eventSet).maybeBlock();
+        doReturn(true).when(eventSet).isLowWaterMark();
+        doReturn(true).when(eventSet).offer(lmqName);
 
         liteEventDispatcher.clientEventMap.put(clientId, eventSet);
 
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java 
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index c97ff2fc29..38644659e1 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -558,6 +558,8 @@ public class BrokerConfig extends BrokerIdentity {
 
     private int maxClientEventCount = 100;
 
+    private long liteEventCapacityCacheTtlMs = 5000;
+
     private long liteEventFullDispatchDelayTime = 10 * 1000;
 
     private long liteEventFullDispatchDelayTimeForWildcardGroup = 10 * 1000;
@@ -2445,6 +2447,14 @@ public class BrokerConfig extends BrokerIdentity {
         this.maxClientEventCount = maxClientEventCount;
     }
 
+    public long getLiteEventCapacityCacheTtlMs() {
+        return liteEventCapacityCacheTtlMs;
+    }
+
+    public void setLiteEventCapacityCacheTtlMs(long 
liteEventCapacityCacheTtlMs) {
+        this.liteEventCapacityCacheTtlMs = liteEventCapacityCacheTtlMs;
+    }
+
     public long getLiteEventFullDispatchDelayTime() {
         return liteEventFullDispatchDelayTime;
     }

Reply via email to