This is an automated email from the ASF dual-hosted git repository.
lizhimins pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 2a9560cc98 [ISSUE #10387] Support runtime hot-reload of
maxClientEventCount in LiteEventDispatcher.ClientEventSet (#10388)
2a9560cc98 is described below
commit 2a9560cc986b4bf9aca23c38e8bb154c5159159c
Author: Quan <[email protected]>
AuthorDate: Tue Jun 2 20:10:44 2026 +0800
[ISSUE #10387] Support runtime hot-reload of maxClientEventCount in
LiteEventDispatcher.ClientEventSet (#10388)
---
.../rocketmq/broker/lite/LiteEventDispatcher.java | 34 +++++++++++++++++++---
.../broker/lite/LiteEventDispatcherTest.java | 6 ++--
.../org/apache/rocketmq/common/BrokerConfig.java | 10 +++++++
3 files changed, 43 insertions(+), 7 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/lite/LiteEventDispatcher.java
b/broker/src/main/java/org/apache/rocketmq/broker/lite/LiteEventDispatcher.java
index 8bdb2879df..1267856da6 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/lite/LiteEventDispatcher.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/lite/LiteEventDispatcher.java
@@ -24,6 +24,7 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
+import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.entity.ClientGroup;
@@ -445,15 +446,36 @@ public class LiteEventDispatcher extends ServiceThread {
private final String group;
private volatile long lastAccessTime = System.currentTimeMillis();
private volatile long lastConsumeTime = System.currentTimeMillis();
+ /**
+ * Cache resolved max capacity to avoid per-offer
SubscriptionGroupConfig lookup + attribute
+ * parsing on the hot dispatch path. Soft-cap semantics tolerate a
short staleness window,
+ * so refresh lazily by TTL {@link
BrokerConfig#getLiteEventCapacityCacheTtlMs()}.
+ */
+ private volatile int maxCapacityCache;
+ private volatile long capacityRefreshTime = System.currentTimeMillis();
public ClientEventSet(String group) {
this.group = group;
- events = new
LinkedBlockingQueue<>(LiteMetadataUtil.getMaxClientEventCount(group,
brokerController));
+ // Use a large bounded queue as a hard ceiling; the effective
capacity is enforced
+ // dynamically via soft-cap in offer() so that maxClientEventCount
can be changed
+ // at runtime without restart.
+ this.events = new LinkedBlockingQueue<>(100_000);
+ this.maxCapacityCache =
LiteMetadataUtil.getMaxClientEventCount(group, brokerController);
+ }
+
+ private int getMaxCapacity() {
+ long now = System.currentTimeMillis();
+ long ttl =
brokerController.getBrokerConfig().getLiteEventCapacityCacheTtlMs();
+ if (now - capacityRefreshTime > ttl) {
+ maxCapacityCache =
LiteMetadataUtil.getMaxClientEventCount(group, brokerController);
+ capacityRefreshTime = now;
+ }
+ return maxCapacityCache;
}
// return false if and only if the queue is full, has race condition
with poll(), but no side effect.
public boolean offer(String event) {
- if (events.remainingCapacity() == 0) {
+ if (events.size() >= getMaxCapacity()) {
return false;
}
boolean rst;
@@ -486,7 +508,8 @@ public class LiteEventDispatcher extends ServiceThread {
public boolean isLowWaterMark() {
int used = events.size();
- return (double) used / (used + events.remainingCapacity()) <
LOW_WATER_MARK;
+ int maxCapacity = getMaxCapacity();
+ return maxCapacity <= 0 || (double) used / maxCapacity <
LOW_WATER_MARK;
}
public boolean isActiveConsuming() {
@@ -516,7 +539,7 @@ public class LiteEventDispatcher extends ServiceThread {
}
/**
- * Mostly triggered when client channel closed, ensure that lite
subscriptions is cleared before.
+ * Mostly triggered when client channel closed, ensure that lite
subscriptions is cleared before.
*/
@Override
public void onRemoveAll(String clientId, String group) {
@@ -553,10 +576,12 @@ public class LiteEventDispatcher extends ServiceThread {
static class LiteSubscriptionIterator implements Iterator<String> {
private final Iterator<String> iterator;
private final String parentTopic;
+
public LiteSubscriptionIterator(String parentTopic, Iterator<String>
iterator) {
this.parentTopic = parentTopic;
this.iterator = iterator;
}
+
@Override
public boolean hasNext() {
return iterator.hasNext();
@@ -572,6 +597,7 @@ public class LiteEventDispatcher extends ServiceThread {
private final String clientId;
private final String group;
private final long timestamp;
+
public FullDispatchRequest(String clientId, String group, long
delayMillis) {
this.clientId = clientId;
this.group = group;
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/lite/LiteEventDispatcherTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/lite/LiteEventDispatcherTest.java
index 31d5562f92..36e4ae2378 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/lite/LiteEventDispatcherTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/lite/LiteEventDispatcherTest.java
@@ -580,9 +580,9 @@ public class LiteEventDispatcherTest {
when(consumerOffsetManager.queryOffset(group, lmqName,
0)).thenReturn(50L);
LiteEventDispatcher.ClientEventSet eventSet =
spy(liteEventDispatcher.new ClientEventSet(group));
- when(eventSet.maybeBlock()).thenReturn(false);
- when(eventSet.isLowWaterMark()).thenReturn(true);
- when(eventSet.offer(lmqName)).thenReturn(true);
+ doReturn(false).when(eventSet).maybeBlock();
+ doReturn(true).when(eventSet).isLowWaterMark();
+ doReturn(true).when(eventSet).offer(lmqName);
liteEventDispatcher.clientEventMap.put(clientId, eventSet);
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index c97ff2fc29..38644659e1 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -558,6 +558,8 @@ public class BrokerConfig extends BrokerIdentity {
private int maxClientEventCount = 100;
+ private long liteEventCapacityCacheTtlMs = 5000;
+
private long liteEventFullDispatchDelayTime = 10 * 1000;
private long liteEventFullDispatchDelayTimeForWildcardGroup = 10 * 1000;
@@ -2445,6 +2447,14 @@ public class BrokerConfig extends BrokerIdentity {
this.maxClientEventCount = maxClientEventCount;
}
+ public long getLiteEventCapacityCacheTtlMs() {
+ return liteEventCapacityCacheTtlMs;
+ }
+
+ public void setLiteEventCapacityCacheTtlMs(long
liteEventCapacityCacheTtlMs) {
+ this.liteEventCapacityCacheTtlMs = liteEventCapacityCacheTtlMs;
+ }
+
public long getLiteEventFullDispatchDelayTime() {
return liteEventFullDispatchDelayTime;
}