Copilot commented on code in PR #10517:
URL: https://github.com/apache/rocketmq/pull/10517#discussion_r3410907548
##########
broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java:
##########
@@ -43,6 +43,21 @@ public class ConsumerOffsetManager extends ConfigManager {
protected static final Logger LOG =
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
public static final String TOPIC_GROUP_SEPARATOR = "@";
+ private final ConcurrentHashMap<String, ConcurrentHashMap<String, String>>
topicGroupKeyCache = new ConcurrentHashMap<>();
+
+ private String buildTopicGroupKey(String topic, String group) {
+ ConcurrentHashMap<String, String> groupMap =
topicGroupKeyCache.get(topic);
+ if (groupMap != null) {
+ String cached = groupMap.get(group);
+ if (cached != null) {
+ return cached;
+ }
+ }
+ String key = topic + TOPIC_GROUP_SEPARATOR + group;
+ topicGroupKeyCache.computeIfAbsent(topic, t -> new
ConcurrentHashMap<>()).put(group, key);
Review Comment:
The `topicGroupKeyCache` grows without bounds and retains entries for every
`(topic, group)` ever observed, which can become a long-lived memory sink on
brokers that serve many ephemeral groups/topics. Consider a bounded cache
(size/TTL) (e.g., Caffeine) or avoid caching entirely unless profiling shows
string construction here is a dominant cost.
##########
broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java:
##########
@@ -58,11 +60,26 @@ public void suspendPullRequest(final String topic, final
int queueId, final Pull
}
private String buildKey(final String topic, final int queueId) {
- StringBuilder sb = new StringBuilder(topic.length() + 5);
- sb.append(topic);
- sb.append(TOPIC_QUEUEID_SEPARATOR);
- sb.append(queueId);
- return sb.toString();
+ String[] keys = buildKeyCache.get(topic);
+ if (keys != null && queueId >= 0 && queueId < keys.length) {
+ String cached = keys[queueId];
+ if (cached != null) {
+ return cached;
+ }
+ }
+ String key = topic + TOPIC_QUEUEID_SEPARATOR + queueId;
+ if (topic != null && queueId >= 0) {
+ int len = Math.max(queueId + 1, 16);
+ keys = buildKeyCache.computeIfAbsent(topic, t -> new String[len]);
+ if (queueId >= keys.length) {
+ String[] grown = new String[queueId + 16];
Review Comment:
The array-sizing arithmetic can overflow for large `queueId` values
(`queueId + 1` / `queueId + 16`), potentially leading to
`NegativeArraySizeException`, and even without overflow can allocate very large
arrays (OOM risk) if `queueId` is unexpectedly high. Prefer a map-based cache
keyed by `queueId` (or cap `queueId`/array growth to a safe maximum derived
from configured queue counts) to avoid pathological allocations.
##########
broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetricsFlushService.java:
##########
@@ -41,10 +41,15 @@ public void run() {
long start = System.currentTimeMillis();
while (!this.isStopped()) {
try {
- if (System.currentTimeMillis() - start >
brokerController.getBrokerConfig().getTransactionMetricFlushInterval()) {
+ // Bug fix: original code only called waitForRunning inside
the if-branch,
+ // so on every iteration where the interval hadn't elapsed yet
the loop
+ // spun without yielding (~170 CPU samples in JFR on idle
broker).
+ // Now we always wait, then check whether enough time has
passed to persist.
+ long interval =
brokerController.getBrokerConfig().getTransactionMetricFlushInterval();
+ this.waitForRunning(interval);
+ if (System.currentTimeMillis() - start > interval) {
Review Comment:
Using `>` after an unconditional `waitForRunning(interval)` can occasionally
skip a persist if the elapsed time equals `interval` (millisecond granularity),
causing the flush cadence to drift toward ~2× the configured interval. Prefer
`>=` here, or compute/store `nextFlushTime = start + interval` and compare `now
>= nextFlushTime`.
##########
broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java:
##########
@@ -58,11 +60,26 @@ public void suspendPullRequest(final String topic, final
int queueId, final Pull
}
private String buildKey(final String topic, final int queueId) {
- StringBuilder sb = new StringBuilder(topic.length() + 5);
- sb.append(topic);
- sb.append(TOPIC_QUEUEID_SEPARATOR);
- sb.append(queueId);
- return sb.toString();
+ String[] keys = buildKeyCache.get(topic);
+ if (keys != null && queueId >= 0 && queueId < keys.length) {
+ String cached = keys[queueId];
+ if (cached != null) {
+ return cached;
+ }
+ }
+ String key = topic + TOPIC_QUEUEID_SEPARATOR + queueId;
+ if (topic != null && queueId >= 0) {
+ int len = Math.max(queueId + 1, 16);
+ keys = buildKeyCache.computeIfAbsent(topic, t -> new String[len]);
Review Comment:
The cached `String[]` is mutated (`keys[queueId] = key`) without any
synchronization/volatile semantics. Other threads may not reliably observe the
write, and concurrent growth/replacement can lose cache updates. If this cache
is accessed concurrently (likely), consider using `ConcurrentHashMap<Integer,
String>` per topic, or an `AtomicReferenceArray` plus a safe growth strategy,
to avoid racy publication.
##########
broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java:
##########
@@ -43,6 +43,21 @@ public class ConsumerOffsetManager extends ConfigManager {
protected static final Logger LOG =
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
public static final String TOPIC_GROUP_SEPARATOR = "@";
+ private final ConcurrentHashMap<String, ConcurrentHashMap<String, String>>
topicGroupKeyCache = new ConcurrentHashMap<>();
Review Comment:
The `topicGroupKeyCache` grows without bounds and retains entries for every
`(topic, group)` ever observed, which can become a long-lived memory sink on
brokers that serve many ephemeral groups/topics. Consider a bounded cache
(size/TTL) (e.g., Caffeine) or avoid caching entirely unless profiling shows
string construction here is a dominant cost.
##########
broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java:
##########
@@ -58,11 +60,26 @@ public void suspendPullRequest(final String topic, final
int queueId, final Pull
}
private String buildKey(final String topic, final int queueId) {
- StringBuilder sb = new StringBuilder(topic.length() + 5);
- sb.append(topic);
- sb.append(TOPIC_QUEUEID_SEPARATOR);
- sb.append(queueId);
- return sb.toString();
+ String[] keys = buildKeyCache.get(topic);
Review Comment:
This introduces a null-key regression: `buildKeyCache.get(topic)` will throw
`NullPointerException` if `topic` is null (since `ConcurrentHashMap` forbids
null keys). The old implementation would build a string containing \"null\"
without throwing. If null `topic` is possible, guard before accessing the cache
(and bypass caching) or throw a clear argument error.
##########
broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java:
##########
@@ -43,6 +43,21 @@ public class ConsumerOffsetManager extends ConfigManager {
protected static final Logger LOG =
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
public static final String TOPIC_GROUP_SEPARATOR = "@";
+ private final ConcurrentHashMap<String, ConcurrentHashMap<String, String>>
topicGroupKeyCache = new ConcurrentHashMap<>();
+
+ private String buildTopicGroupKey(String topic, String group) {
+ ConcurrentHashMap<String, String> groupMap =
topicGroupKeyCache.get(topic);
Review Comment:
This change introduces a behavior regression for null `topic`/`group`:
`ConcurrentHashMap` does not allow null keys, so
`topicGroupKeyCache.get(topic)` / `groupMap.get(group)` will throw
`NullPointerException`. Previously, `topic + TOPIC_GROUP_SEPARATOR + group`
would produce a non-throwing string (e.g., \"null@group\"/\"topic@null\"). If
nulls are possible at call sites, handle them explicitly (e.g., bypass the
cache and use `String.valueOf(...)` concatenation) or throw a clear
`IllegalArgumentException`.
##########
broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java:
##########
@@ -58,11 +60,26 @@ public void suspendPullRequest(final String topic, final
int queueId, final Pull
}
private String buildKey(final String topic, final int queueId) {
- StringBuilder sb = new StringBuilder(topic.length() + 5);
- sb.append(topic);
- sb.append(TOPIC_QUEUEID_SEPARATOR);
- sb.append(queueId);
- return sb.toString();
+ String[] keys = buildKeyCache.get(topic);
+ if (keys != null && queueId >= 0 && queueId < keys.length) {
+ String cached = keys[queueId];
+ if (cached != null) {
+ return cached;
+ }
+ }
+ String key = topic + TOPIC_QUEUEID_SEPARATOR + queueId;
+ if (topic != null && queueId >= 0) {
+ int len = Math.max(queueId + 1, 16);
+ keys = buildKeyCache.computeIfAbsent(topic, t -> new String[len]);
+ if (queueId >= keys.length) {
+ String[] grown = new String[queueId + 16];
+ System.arraycopy(keys, 0, grown, 0, keys.length);
+ buildKeyCache.put(topic, grown);
+ keys = grown;
+ }
+ keys[queueId] = key;
Review Comment:
The cached `String[]` is mutated (`keys[queueId] = key`) without any
synchronization/volatile semantics. Other threads may not reliably observe the
write, and concurrent growth/replacement can lose cache updates. If this cache
is accessed concurrently (likely), consider using `ConcurrentHashMap<Integer,
String>` per topic, or an `AtomicReferenceArray` plus a safe growth strategy,
to avoid racy publication.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]