Copilot commented on code in PR #10517:
URL: https://github.com/apache/rocketmq/pull/10517#discussion_r3410907548


##########
broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java:
##########
@@ -43,6 +43,21 @@ public class ConsumerOffsetManager extends ConfigManager {
     protected static final Logger LOG = 
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     public static final String TOPIC_GROUP_SEPARATOR = "@";
 
+    private final ConcurrentHashMap<String, ConcurrentHashMap<String, String>> 
topicGroupKeyCache = new ConcurrentHashMap<>();
+
+    private String buildTopicGroupKey(String topic, String group) {
+        ConcurrentHashMap<String, String> groupMap = 
topicGroupKeyCache.get(topic);
+        if (groupMap != null) {
+            String cached = groupMap.get(group);
+            if (cached != null) {
+                return cached;
+            }
+        }
+        String key = topic + TOPIC_GROUP_SEPARATOR + group;
+        topicGroupKeyCache.computeIfAbsent(topic, t -> new 
ConcurrentHashMap<>()).put(group, key);

Review Comment:
   The `topicGroupKeyCache` grows without bounds and retains entries for every 
`(topic, group)` ever observed, which can become a long-lived memory sink on 
brokers that serve many ephemeral groups/topics. Consider a bounded cache 
(size/TTL) (e.g., Caffeine) or avoid caching entirely unless profiling shows 
string construction here is a dominant cost.



##########
broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java:
##########
@@ -58,11 +60,26 @@ public void suspendPullRequest(final String topic, final 
int queueId, final Pull
     }
 
     private String buildKey(final String topic, final int queueId) {
-        StringBuilder sb = new StringBuilder(topic.length() + 5);
-        sb.append(topic);
-        sb.append(TOPIC_QUEUEID_SEPARATOR);
-        sb.append(queueId);
-        return sb.toString();
+        String[] keys = buildKeyCache.get(topic);
+        if (keys != null && queueId >= 0 && queueId < keys.length) {
+            String cached = keys[queueId];
+            if (cached != null) {
+                return cached;
+            }
+        }
+        String key = topic + TOPIC_QUEUEID_SEPARATOR + queueId;
+        if (topic != null && queueId >= 0) {
+            int len = Math.max(queueId + 1, 16);
+            keys = buildKeyCache.computeIfAbsent(topic, t -> new String[len]);
+            if (queueId >= keys.length) {
+                String[] grown = new String[queueId + 16];

Review Comment:
   The array-sizing arithmetic can overflow for large `queueId` values 
(`queueId + 1` / `queueId + 16`), potentially leading to 
`NegativeArraySizeException`, and even without overflow can allocate very large 
arrays (OOM risk) if `queueId` is unexpectedly high. Prefer a map-based cache 
keyed by `queueId` (or cap `queueId`/array growth to a safe maximum derived 
from configured queue counts) to avoid pathological allocations.



##########
broker/src/main/java/org/apache/rocketmq/broker/transaction/TransactionMetricsFlushService.java:
##########
@@ -41,10 +41,15 @@ public void run() {
         long start = System.currentTimeMillis();
         while (!this.isStopped()) {
             try {
-                if (System.currentTimeMillis() - start > 
brokerController.getBrokerConfig().getTransactionMetricFlushInterval()) {
+                // Bug fix: original code only called waitForRunning inside 
the if-branch,
+                // so on every iteration where the interval hadn't elapsed yet 
the loop
+                // spun without yielding (~170 CPU samples in JFR on idle 
broker).
+                // Now we always wait, then check whether enough time has 
passed to persist.
+                long interval = 
brokerController.getBrokerConfig().getTransactionMetricFlushInterval();
+                this.waitForRunning(interval);
+                if (System.currentTimeMillis() - start > interval) {

Review Comment:
   Using `>` after an unconditional `waitForRunning(interval)` can occasionally 
skip a persist if the elapsed time equals `interval` (millisecond granularity), 
causing the flush cadence to drift toward ~2× the configured interval. Prefer 
`>=` here, or compute/store `nextFlushTime = start + interval` and compare `now 
>= nextFlushTime`.



##########
broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java:
##########
@@ -58,11 +60,26 @@ public void suspendPullRequest(final String topic, final 
int queueId, final Pull
     }
 
     private String buildKey(final String topic, final int queueId) {
-        StringBuilder sb = new StringBuilder(topic.length() + 5);
-        sb.append(topic);
-        sb.append(TOPIC_QUEUEID_SEPARATOR);
-        sb.append(queueId);
-        return sb.toString();
+        String[] keys = buildKeyCache.get(topic);
+        if (keys != null && queueId >= 0 && queueId < keys.length) {
+            String cached = keys[queueId];
+            if (cached != null) {
+                return cached;
+            }
+        }
+        String key = topic + TOPIC_QUEUEID_SEPARATOR + queueId;
+        if (topic != null && queueId >= 0) {
+            int len = Math.max(queueId + 1, 16);
+            keys = buildKeyCache.computeIfAbsent(topic, t -> new String[len]);

Review Comment:
   The cached `String[]` is mutated (`keys[queueId] = key`) without any 
synchronization/volatile semantics. Other threads may not reliably observe the 
write, and concurrent growth/replacement can lose cache updates. If this cache 
is accessed concurrently (likely), consider using `ConcurrentHashMap<Integer, 
String>` per topic, or an `AtomicReferenceArray` plus a safe growth strategy, 
to avoid racy publication.



##########
broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java:
##########
@@ -43,6 +43,21 @@ public class ConsumerOffsetManager extends ConfigManager {
     protected static final Logger LOG = 
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     public static final String TOPIC_GROUP_SEPARATOR = "@";
 
+    private final ConcurrentHashMap<String, ConcurrentHashMap<String, String>> 
topicGroupKeyCache = new ConcurrentHashMap<>();

Review Comment:
   The `topicGroupKeyCache` grows without bounds and retains entries for every 
`(topic, group)` ever observed, which can become a long-lived memory sink on 
brokers that serve many ephemeral groups/topics. Consider a bounded cache 
(size/TTL) (e.g., Caffeine) or avoid caching entirely unless profiling shows 
string construction here is a dominant cost.



##########
broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java:
##########
@@ -58,11 +60,26 @@ public void suspendPullRequest(final String topic, final 
int queueId, final Pull
     }
 
     private String buildKey(final String topic, final int queueId) {
-        StringBuilder sb = new StringBuilder(topic.length() + 5);
-        sb.append(topic);
-        sb.append(TOPIC_QUEUEID_SEPARATOR);
-        sb.append(queueId);
-        return sb.toString();
+        String[] keys = buildKeyCache.get(topic);

Review Comment:
   This introduces a null-key regression: `buildKeyCache.get(topic)` will throw 
`NullPointerException` if `topic` is null (since `ConcurrentHashMap` forbids 
null keys). The old implementation would build a string containing \"null\" 
without throwing. If null `topic` is possible, guard before accessing the cache 
(and bypass caching) or throw a clear argument error.



##########
broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java:
##########
@@ -43,6 +43,21 @@ public class ConsumerOffsetManager extends ConfigManager {
     protected static final Logger LOG = 
LoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
     public static final String TOPIC_GROUP_SEPARATOR = "@";
 
+    private final ConcurrentHashMap<String, ConcurrentHashMap<String, String>> 
topicGroupKeyCache = new ConcurrentHashMap<>();
+
+    private String buildTopicGroupKey(String topic, String group) {
+        ConcurrentHashMap<String, String> groupMap = 
topicGroupKeyCache.get(topic);

Review Comment:
   This change introduces a behavior regression for null `topic`/`group`: 
`ConcurrentHashMap` does not allow null keys, so 
`topicGroupKeyCache.get(topic)` / `groupMap.get(group)` will throw 
`NullPointerException`. Previously, `topic + TOPIC_GROUP_SEPARATOR + group` 
would produce a non-throwing string (e.g., \"null@group\"/\"topic@null\"). If 
nulls are possible at call sites, handle them explicitly (e.g., bypass the 
cache and use `String.valueOf(...)` concatenation) or throw a clear 
`IllegalArgumentException`.



##########
broker/src/main/java/org/apache/rocketmq/broker/longpolling/PullRequestHoldService.java:
##########
@@ -58,11 +60,26 @@ public void suspendPullRequest(final String topic, final 
int queueId, final Pull
     }
 
     private String buildKey(final String topic, final int queueId) {
-        StringBuilder sb = new StringBuilder(topic.length() + 5);
-        sb.append(topic);
-        sb.append(TOPIC_QUEUEID_SEPARATOR);
-        sb.append(queueId);
-        return sb.toString();
+        String[] keys = buildKeyCache.get(topic);
+        if (keys != null && queueId >= 0 && queueId < keys.length) {
+            String cached = keys[queueId];
+            if (cached != null) {
+                return cached;
+            }
+        }
+        String key = topic + TOPIC_QUEUEID_SEPARATOR + queueId;
+        if (topic != null && queueId >= 0) {
+            int len = Math.max(queueId + 1, 16);
+            keys = buildKeyCache.computeIfAbsent(topic, t -> new String[len]);
+            if (queueId >= keys.length) {
+                String[] grown = new String[queueId + 16];
+                System.arraycopy(keys, 0, grown, 0, keys.length);
+                buildKeyCache.put(topic, grown);
+                keys = grown;
+            }
+            keys[queueId] = key;

Review Comment:
   The cached `String[]` is mutated (`keys[queueId] = key`) without any 
synchronization/volatile semantics. Other threads may not reliably observe the 
write, and concurrent growth/replacement can lose cache updates. If this cache 
is accessed concurrently (likely), consider using `ConcurrentHashMap<Integer, 
String>` per topic, or an `AtomicReferenceArray` plus a safe growth strategy, 
to avoid racy publication.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to