wang-jiahua opened a new issue, #10515:
URL: https://github.com/apache/rocketmq/issues/10515

   ### Before Creating the Enhancement Request
   
   - [x] I have confirmed that this should be classified as an enhancement 
rather than a bug/feature.
   
   ### Summary
   
   Three independent per-message/per-request allocation and CPU optimizations 
in the broker layer:
   
   1. **`ConsumerOffsetManager`**: Cache `topic + "@" + group` string keys in a 
2-level `ConcurrentHashMap` to eliminate per-call `String` concatenation.
   2. **`PullRequestHoldService.buildKey()`**: Cache `topic + "@" + queueId` 
string keys in a per-topic `String[]` array indexed by queueId to eliminate 
per-call `StringBuilder` + `toString()` allocation.
   3. **`TransactionMetricsFlushService`**: Fix a busy-spin bug where 
`waitForRunning()` was only called inside the `if` branch, causing the loop to 
spin without yielding when the interval hadn't elapsed yet (~170 CPU samples on 
idle broker in JFR).
   
   ### Motivation
   
   JFR `settings=profile` on a broker under steady-state load reveals:
   
   - `ConsumerOffsetManager.commitOffset()` calls `topic + "@" + group` on 
every consumer offset commit — a stable set of (topic, group) pairs generates 
the same key string repeatedly. Each concatenation allocates a `StringBuilder` 
+ `String` + `byte[]`.
   - `PullRequestHoldService.buildKey()` creates a `StringBuilder` and calls 
`toString()` on every pull request hold/check — same stable key pattern.
   - `TransactionMetricsFlushService.run()` has a correctness bug: 
`waitForRunning()` is inside the `if` branch, so when the interval hasn't 
elapsed the `while` loop spins without any sleep — wasting CPU on a background 
service thread.
   
   ### Describe the Solution You'd Like
   
   **1. ConsumerOffsetManager — topicGroupKeyCache**
   
   ```java
   private final ConcurrentHashMap<String, ConcurrentHashMap<String, String>> 
topicGroupKeyCache = new ConcurrentHashMap<>();
   
   private String buildTopicGroupKey(String topic, String group) {
       ConcurrentHashMap<String, String> groupMap = 
topicGroupKeyCache.get(topic);
       if (groupMap != null) {
           String cached = groupMap.get(group);
           if (cached != null) return cached;
       }
       String key = topic + TOPIC_GROUP_SEPARATOR + group;
       topicGroupKeyCache.computeIfAbsent(topic, t -> new 
ConcurrentHashMap<>()).put(group, key);
       return key;
   }
   ```
   
   Replace all `topic + TOPIC_GROUP_SEPARATOR + group` inline concatenations 
with `buildTopicGroupKey(topic, group)`.
   
   **2. PullRequestHoldService — buildKeyCache**
   
   ```java
   private final ConcurrentMap<String, String[]> buildKeyCache = new 
ConcurrentHashMap<>();
   
   private String buildKey(String topic, int queueId) {
       String[] keys = buildKeyCache.get(topic);
       if (keys != null && queueId >= 0 && queueId < keys.length) {
           String cached = keys[queueId];
           if (cached != null) return cached;
       }
       String key = topic + TOPIC_QUEUEID_SEPARATOR + queueId;
       // grow array if needed, cache result
       return key;
   }
   ```
   
   **3. TransactionMetricsFlushService — busy spin fix**
   
   ```java
   // Before (buggy): waitForRunning only inside if-branch
   if (System.currentTimeMillis() - start > interval) {
       start = System.currentTimeMillis();
       persist();
       waitForRunning(interval);  // only sleeps when flushing!
   }
   
   // After (fixed): always wait, then check
   long interval = 
brokerController.getBrokerConfig().getTransactionMetricFlushInterval();
   this.waitForRunning(interval);
   if (System.currentTimeMillis() - start > interval) {
       start = System.currentTimeMillis();
       persist();
   }
   ```
   
   ### Describe Alternatives You've Considered
   
   - **`String.intern()` for key caching**: Rejected due to global String table 
contention and unpredictable GC behavior.
   - **Single `ConcurrentHashMap<String, String>` for all keys**: Less 
cache-friendly than per-topic arrays; requires string key construction for 
every lookup.
   - **Using `Thread.sleep()` instead of `waitForRunning()`**: 
`waitForRunning()` is the `ServiceThread` pattern that supports graceful 
shutdown via `wakeup()` — `Thread.sleep()` would delay shutdown.
   
   ### Additional Context
   
   Files changed:
   - `broker/.../offset/ConsumerOffsetManager.java` — `buildTopicGroupKey()` + 
2-level CHM cache, 10 call sites migrated
   - `broker/.../longpolling/PullRequestHoldService.java` — `buildKeyCache` 
with `String[]` per topic
   - `broker/.../transaction/TransactionMetricsFlushService.java` — 
`waitForRunning()` moved before `if` branch
   
   Commercial version compatibility verified: `ConsumerOffsetManagerProxy` only 
overrides `configFilePath()`, no override conflicts with new private methods.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to