Copilot commented on code in PR #10528:
URL: https://github.com/apache/rocketmq/pull/10528#discussion_r3429301864
##########
store/src/main/java/org/apache/rocketmq/store/index/IndexService.java:
##########
@@ -49,6 +49,7 @@ public class IndexService implements CommitLogDispatchStore {
private final String storePath;
private final ArrayList<IndexFile> indexFileList = new ArrayList<>();
private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+ private final StringBuilder reusableKeyBuilder = new StringBuilder(128);
Review Comment:
Using a single instance `StringBuilder` (`reusableKeyBuilder`) makes
`buildKey(...)` not thread-safe. If `IndexService` methods are called
concurrently, keys can be corrupted due to races on the shared builder. Prefer
a local `StringBuilder`, a `ThreadLocal<StringBuilder>`, or guard access (e.g.,
synchronize) around builder mutation.
##########
store/src/main/java/org/apache/rocketmq/store/index/IndexService.java:
##########
@@ -215,10 +216,12 @@ public QueryOffsetResult queryOffset(String topic, String
key, int maxNum, long
}
private String buildKey(final String topic, final String key) {
- return topic + "#" + key;
+ reusableKeyBuilder.setLength(0);
+ return
reusableKeyBuilder.append(topic).append('#').append(key).toString();
}
private String buildKey(final String topic, final String key, final String
indexType) {
- return topic + "#" + indexType + "#" + key;
+ reusableKeyBuilder.setLength(0);
+ return
reusableKeyBuilder.append(topic).append('#').append(indexType).append('#').append(key).toString();
Review Comment:
Using a single instance `StringBuilder` (`reusableKeyBuilder`) makes
`buildKey(...)` not thread-safe. If `IndexService` methods are called
concurrently, keys can be corrupted due to races on the shared builder. Prefer
a local `StringBuilder`, a `ThreadLocal<StringBuilder>`, or guard access (e.g.,
synchronize) around builder mutation.
##########
store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java:
##########
@@ -464,47 +470,92 @@ public void incGroupAckNums(final String group, final
String topic, final int in
}
public String buildStatsKey(String topic, String group) {
- StringBuilder strBuilder;
if (topic != null && group != null) {
- strBuilder = new StringBuilder(topic.length() + group.length() +
1);
- } else {
- strBuilder = new StringBuilder();
+ ConcurrentHashMap<String, String> groupMap =
statsKeyByGroupCache.get(topic);
+ if (groupMap != null) {
+ String cached = groupMap.get(group);
+ if (cached != null) {
+ return cached;
+ }
+ }
+ String key = topic + "@" + group;
+ statsKeyByGroupCache.computeIfAbsent(topic, t -> new
ConcurrentHashMap<>()).put(group, key);
+ return key;
}
- strBuilder.append(topic).append("@").append(group);
- return strBuilder.toString();
+ return topic + "@" + group;
}
public String buildStatsKey(String topic, int queueId) {
- StringBuilder strBuilder;
- if (topic != null) {
- strBuilder = new StringBuilder(topic.length() + 5);
- } else {
- strBuilder = new StringBuilder();
+ String[] keys = statsKeyByQueueCache.get(topic);
+ if (keys != null && queueId >= 0 && queueId < keys.length) {
+ String cached = keys[queueId];
+ if (cached != null) {
+ return cached;
+ }
}
- strBuilder.append(topic).append("@").append(queueId);
- return strBuilder.toString();
+ String key = topic + "@" + queueId;
Review Comment:
`ConcurrentHashMap` does not permit `null` keys. `buildStatsKey(String,
int)` and `getLatencyKey(...)` now call `...Cache.get(topic)` without guarding
`topic != null`, which can throw `NullPointerException` where the previous
implementation tolerated `topic == null`. Add an early return for `topic ==
null` before accessing the cache (or skip caching in that case).
##########
store/src/main/java/org/apache/rocketmq/store/timer/TimerWheel.java:
##########
@@ -301,6 +310,7 @@ public void putSlot(long timeMs, long firstPos, long
lastPos, int num, int magic
localBuffer.get().putLong(lastPos);
localBuffer.get().putInt(num);
localBuffer.get().putInt(magic);
+ dirty = true;
Review Comment:
The `dirty` flag can lose updates under concurrency: during `flush()`,
another thread can set `dirty = true` after the `if (!dirty)` check but before
`dirty = false` at the end, causing the write to be missed and the wheel state
not persisted. Use an atomic state transition (e.g.,
`AtomicBoolean.compareAndSet(true, false)` at flush start) or
synchronize/serialize modifications and flush so `dirty` cannot be cleared
while writes are in-flight.
##########
store/src/main/java/org/apache/rocketmq/store/timer/TimerWheel.java:
##########
@@ -313,11 +323,13 @@ public void reviseSlot(long timeMs, long firstPos, long
lastPos, boolean force)
} else {
if (IGNORE != firstPos) {
localBuffer.get().putLong(firstPos);
+ dirty = true;
Review Comment:
The `dirty` flag can lose updates under concurrency: during `flush()`,
another thread can set `dirty = true` after the `if (!dirty)` check but before
`dirty = false` at the end, causing the write to be missed and the wheel state
not persisted. Use an atomic state transition (e.g.,
`AtomicBoolean.compareAndSet(true, false)` at flush start) or
synchronize/serialize modifications and flush so `dirty` cannot be cleared
while writes are in-flight.
##########
store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java:
##########
@@ -380,32 +386,32 @@ public void onGroupDeleted(final String group) {
this.momentStatsItemSetFallTime.delValueBySuffixKey(group, "@");
}
- public void incQueuePutNums(final String topic, final Integer queueId) {
+ public void incQueuePutNums(final String topic, final int queueId) {
if (enableQueueStat) {
this.statsTable.get(Stats.QUEUE_PUT_NUMS).addValue(buildStatsKey(topic,
queueId), 1, 1);
}
}
- public void incQueuePutNums(final String topic, final Integer queueId, int
num, int times) {
+ public void incQueuePutNums(final String topic, final int queueId, int
num, int times) {
if (enableQueueStat) {
this.statsTable.get(Stats.QUEUE_PUT_NUMS).addValue(buildStatsKey(topic,
queueId), num, times);
}
}
- public void incQueuePutSize(final String topic, final Integer queueId,
final int size) {
+ public void incQueuePutSize(final String topic, final int queueId, final
int size) {
if (enableQueueStat) {
this.statsTable.get(Stats.QUEUE_PUT_SIZE).addValue(buildStatsKey(topic,
queueId), size, 1);
}
}
- public void incQueueGetNums(final String group, final String topic, final
Integer queueId, final int incValue) {
+ public void incQueueGetNums(final String group, final String topic, final
int queueId, final int incValue) {
Review Comment:
Changing public method parameters from `Integer` to primitive `int` is a
source/binary compatibility change for callers compiled against the old
signature, and also removes the ability to pass `null` (previously would
produce a string key like `topic@null`). If this is a public API, consider
keeping overloads with `Integer` (possibly deprecated) or performing the
migration in a compatibility-friendly way.
##########
store/src/main/java/org/apache/rocketmq/store/timer/TimerWheel.java:
##########
@@ -313,11 +323,13 @@ public void reviseSlot(long timeMs, long firstPos, long
lastPos, boolean force)
} else {
if (IGNORE != firstPos) {
localBuffer.get().putLong(firstPos);
+ dirty = true;
} else {
localBuffer.get().getLong();
}
if (IGNORE != lastPos) {
localBuffer.get().putLong(lastPos);
+ dirty = true;
Review Comment:
The `dirty` flag can lose updates under concurrency: during `flush()`,
another thread can set `dirty = true` after the `if (!dirty)` check but before
`dirty = false` at the end, causing the write to be missed and the wheel state
not persisted. Use an atomic state transition (e.g.,
`AtomicBoolean.compareAndSet(true, false)` at flush start) or
synchronize/serialize modifications and flush so `dirty` cannot be cleared
while writes are in-flight.
##########
store/src/main/java/org/apache/rocketmq/store/timer/TimerWheel.java:
##########
@@ -55,6 +55,8 @@ protected ByteBuffer initialValue() {
};
private final int wheelLength;
+ private volatile boolean dirty;
Review Comment:
The `dirty` flag can lose updates under concurrency: during `flush()`,
another thread can set `dirty = true` after the `if (!dirty)` check but before
`dirty = false` at the end, causing the write to be missed and the wheel state
not persisted. Use an atomic state transition (e.g.,
`AtomicBoolean.compareAndSet(true, false)` at flush start) or
synchronize/serialize modifications and flush so `dirty` cannot be cleared
while writes are in-flight.
##########
store/src/main/java/org/apache/rocketmq/store/timer/TimerWheel.java:
##########
@@ -128,17 +130,25 @@ public void flush() {
if (mappedByteBuffer == null) {
return;
}
+ if (!dirty) {
+ return;
+ }
ByteBuffer bf = localBuffer.get();
- bf.position(0);
- bf.limit(wheelLength);
- mappedByteBuffer.position(0);
- mappedByteBuffer.limit(wheelLength);
- for (int i = 0; i < wheelLength; i++) {
- if (bf.get(i) != mappedByteBuffer.get(i)) {
- mappedByteBuffer.put(i, bf.get(i));
+ int longAligned = wheelLength & ~7;
Review Comment:
The `dirty` flag can lose updates under concurrency: during `flush()`,
another thread can set `dirty = true` after the `if (!dirty)` check but before
`dirty = false` at the end, causing the write to be missed and the wheel state
not persisted. Use an atomic state transition (e.g.,
`AtomicBoolean.compareAndSet(true, false)` at flush start) or
synchronize/serialize modifications and flush so `dirty` cannot be cleared
while writes are in-flight.
##########
store/src/main/java/org/apache/rocketmq/store/timer/TimerWheel.java:
##########
@@ -287,11 +297,10 @@ public int getSlotIndex(long timeMs) {
public void putSlot(long timeMs, long firstPos, long lastPos) {
localBuffer.get().position(getSlotIndex(timeMs) * Slot.SIZE);
- // To be compatible with previous version.
- // The previous version's precision is fixed at 1000ms and it store
timeMs / 1000 in slot.
localBuffer.get().putLong(timeMs / precisionMs);
localBuffer.get().putLong(firstPos);
localBuffer.get().putLong(lastPos);
+ dirty = true;
Review Comment:
The `dirty` flag can lose updates under concurrency: during `flush()`,
another thread can set `dirty = true` after the `if (!dirty)` check but before
`dirty = false` at the end, causing the write to be missed and the wheel state
not persisted. Use an atomic state transition (e.g.,
`AtomicBoolean.compareAndSet(true, false)` at flush start) or
synchronize/serialize modifications and flush so `dirty` cannot be cleared
while writes are in-flight.
##########
store/src/main/java/org/apache/rocketmq/store/timer/TimerWheel.java:
##########
@@ -128,17 +130,25 @@ public void flush() {
if (mappedByteBuffer == null) {
return;
}
+ if (!dirty) {
+ return;
+ }
ByteBuffer bf = localBuffer.get();
- bf.position(0);
- bf.limit(wheelLength);
- mappedByteBuffer.position(0);
- mappedByteBuffer.limit(wheelLength);
- for (int i = 0; i < wheelLength; i++) {
- if (bf.get(i) != mappedByteBuffer.get(i)) {
- mappedByteBuffer.put(i, bf.get(i));
+ int longAligned = wheelLength & ~7;
+ for (int i = 0; i < longAligned; i += 8) {
+ long local = bf.getLong(i);
+ if (local != mappedByteBuffer.getLong(i)) {
+ mappedByteBuffer.putLong(i, local);
+ }
+ }
+ for (int i = longAligned; i < wheelLength; i++) {
+ byte b = bf.get(i);
+ if (b != mappedByteBuffer.get(i)) {
+ mappedByteBuffer.put(i, b);
}
}
this.mappedByteBuffer.force();
+ dirty = false;
Review Comment:
The `dirty` flag can lose updates under concurrency: during `flush()`,
another thread can set `dirty = true` after the `if (!dirty)` check but before
`dirty = false` at the end, causing the write to be missed and the wheel state
not persisted. Use an atomic state transition (e.g.,
`AtomicBoolean.compareAndSet(true, false)` at flush start) or
synchronize/serialize modifications and flush so `dirty` cannot be cleared
while writes are in-flight.
##########
store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java:
##########
@@ -380,32 +386,32 @@ public void onGroupDeleted(final String group) {
this.momentStatsItemSetFallTime.delValueBySuffixKey(group, "@");
}
- public void incQueuePutNums(final String topic, final Integer queueId) {
+ public void incQueuePutNums(final String topic, final int queueId) {
if (enableQueueStat) {
this.statsTable.get(Stats.QUEUE_PUT_NUMS).addValue(buildStatsKey(topic,
queueId), 1, 1);
}
}
- public void incQueuePutNums(final String topic, final Integer queueId, int
num, int times) {
+ public void incQueuePutNums(final String topic, final int queueId, int
num, int times) {
if (enableQueueStat) {
this.statsTable.get(Stats.QUEUE_PUT_NUMS).addValue(buildStatsKey(topic,
queueId), num, times);
}
}
- public void incQueuePutSize(final String topic, final Integer queueId,
final int size) {
+ public void incQueuePutSize(final String topic, final int queueId, final
int size) {
if (enableQueueStat) {
this.statsTable.get(Stats.QUEUE_PUT_SIZE).addValue(buildStatsKey(topic,
queueId), size, 1);
}
}
- public void incQueueGetNums(final String group, final String topic, final
Integer queueId, final int incValue) {
+ public void incQueueGetNums(final String group, final String topic, final
int queueId, final int incValue) {
if (enableQueueStat) {
final String statsKey = buildStatsKey(topic, queueId, group);
this.statsTable.get(Stats.QUEUE_GET_NUMS).addValue(statsKey,
incValue, 1);
}
}
- public void incQueueGetSize(final String group, final String topic, final
Integer queueId, final int incValue) {
+ public void incQueueGetSize(final String group, final String topic, final
int queueId, final int incValue) {
Review Comment:
Changing public method parameters from `Integer` to primitive `int` is a
source/binary compatibility change for callers compiled against the old
signature, and also removes the ability to pass `null` (previously would
produce a string key like `topic@null`). If this is a public API, consider
keeping overloads with `Integer` (possibly deprecated) or performing the
migration in a compatibility-friendly way.
##########
store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java:
##########
@@ -523,14 +574,30 @@ public void incGroupGetLatency(final String group, final
String topic, final int
}
public void incTopicPutLatency(final String topic, final int queueId,
final int incValue) {
- StringBuilder statsKey;
+ String statsKey = getLatencyKey(topic, queueId);
+ this.statsTable.get(Stats.TOPIC_PUT_LATENCY).addValue(statsKey,
incValue, 1);
+ }
+
+ private String getLatencyKey(final String topic, final int queueId) {
+ String[] keys = latencyKeyCache.get(topic);
+ if (keys != null && queueId < keys.length) {
+ String k = keys[queueId];
+ if (k != null) {
+ return k;
+ }
+ }
Review Comment:
`ConcurrentHashMap` does not permit `null` keys. `buildStatsKey(String,
int)` and `getLatencyKey(...)` now call `...Cache.get(topic)` without guarding
`topic != null`, which can throw `NullPointerException` where the previous
implementation tolerated `topic == null`. Add an early return for `topic ==
null` before accessing the cache (or skip caching in that case).
##########
store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java:
##########
@@ -380,32 +386,32 @@ public void onGroupDeleted(final String group) {
this.momentStatsItemSetFallTime.delValueBySuffixKey(group, "@");
}
- public void incQueuePutNums(final String topic, final Integer queueId) {
+ public void incQueuePutNums(final String topic, final int queueId) {
Review Comment:
Changing public method parameters from `Integer` to primitive `int` is a
source/binary compatibility change for callers compiled against the old
signature, and also removes the ability to pass `null` (previously would
produce a string key like `topic@null`). If this is a public API, consider
keeping overloads with `Integer` (possibly deprecated) or performing the
migration in a compatibility-friendly way.
##########
store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java:
##########
@@ -380,32 +386,32 @@ public void onGroupDeleted(final String group) {
this.momentStatsItemSetFallTime.delValueBySuffixKey(group, "@");
}
- public void incQueuePutNums(final String topic, final Integer queueId) {
+ public void incQueuePutNums(final String topic, final int queueId) {
if (enableQueueStat) {
this.statsTable.get(Stats.QUEUE_PUT_NUMS).addValue(buildStatsKey(topic,
queueId), 1, 1);
}
}
- public void incQueuePutNums(final String topic, final Integer queueId, int
num, int times) {
+ public void incQueuePutNums(final String topic, final int queueId, int
num, int times) {
if (enableQueueStat) {
this.statsTable.get(Stats.QUEUE_PUT_NUMS).addValue(buildStatsKey(topic,
queueId), num, times);
}
}
- public void incQueuePutSize(final String topic, final Integer queueId,
final int size) {
+ public void incQueuePutSize(final String topic, final int queueId, final
int size) {
Review Comment:
Changing public method parameters from `Integer` to primitive `int` is a
source/binary compatibility change for callers compiled against the old
signature, and also removes the ability to pass `null` (previously would
produce a string key like `topic@null`). If this is a public API, consider
keeping overloads with `Integer` (possibly deprecated) or performing the
migration in a compatibility-friendly way.
##########
store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java:
##########
@@ -141,6 +142,11 @@ public class BrokerStatsManager {
private ScheduledExecutorService cleanResourceExecutor;
private final HashMap<String, StatsItemSet> statsTable = new HashMap<>();
+ private final ConcurrentHashMap<String, String[]> latencyKeyCache = new
ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, String[]> statsKeyByQueueCache =
new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, ConcurrentHashMap<String, String>>
statsKeyByGroupCache = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, ConcurrentHashMap<String,
String[]>> statsKeyByTopicQueueGroupCache = new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String, ConcurrentHashMap<String,
String[]>> statsKeyByQueueTopicGroupCache = new ConcurrentHashMap<>();
Review Comment:
These caches are unbounded and appear to retain entries indefinitely per
topic/group/queue, which can lead to memory growth in long-running brokers with
high churn or many unique groups/topics. Consider clearing relevant cache
entries in existing lifecycle hooks (e.g., on topic/group deletion) and/or
introducing size bounds/eviction to prevent unbounded retention.
##########
store/src/main/java/org/apache/rocketmq/store/stats/BrokerStatsManager.java:
##########
@@ -380,32 +386,32 @@ public void onGroupDeleted(final String group) {
this.momentStatsItemSetFallTime.delValueBySuffixKey(group, "@");
}
- public void incQueuePutNums(final String topic, final Integer queueId) {
+ public void incQueuePutNums(final String topic, final int queueId) {
if (enableQueueStat) {
this.statsTable.get(Stats.QUEUE_PUT_NUMS).addValue(buildStatsKey(topic,
queueId), 1, 1);
}
}
- public void incQueuePutNums(final String topic, final Integer queueId, int
num, int times) {
+ public void incQueuePutNums(final String topic, final int queueId, int
num, int times) {
Review Comment:
Changing public method parameters from `Integer` to primitive `int` is a
source/binary compatibility change for callers compiled against the old
signature, and also removes the ability to pass `null` (previously would
produce a string key like `topic@null`). If this is a public API, consider
keeping overloads with `Integer` (possibly deprecated) or performing the
migration in a compatibility-friendly way.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]