This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 8505482c0b fix: avoid memory overhead when there is large number of
LMQ ConsumeQueue (#8956)
8505482c0b is described below
commit 8505482c0b05b6dceb2e3a372bd8c9848c26c244
Author: Zhanhui Li <[email protected]>
AuthorDate: Wed Nov 20 14:55:30 2024 +0800
fix: avoid memory overhead when there is large number of LMQ ConsumeQueue
(#8956)
---
.../apache/rocketmq/store/queue/AbstractConsumeQueueStore.java | 6 +++++-
.../org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java | 8 +++++++-
2 files changed, 12 insertions(+), 2 deletions(-)
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/AbstractConsumeQueueStore.java
b/store/src/main/java/org/apache/rocketmq/store/queue/AbstractConsumeQueueStore.java
index dfce665d8f..ef693dc1e6 100644
---
a/store/src/main/java/org/apache/rocketmq/store/queue/AbstractConsumeQueueStore.java
+++
b/store/src/main/java/org/apache/rocketmq/store/queue/AbstractConsumeQueueStore.java
@@ -39,7 +39,11 @@ public abstract class AbstractConsumeQueueStore implements
ConsumeQueueStoreInte
public AbstractConsumeQueueStore(DefaultMessageStore messageStore) {
this.messageStore = messageStore;
this.messageStoreConfig = messageStore.getMessageStoreConfig();
- this.consumeQueueTable = new ConcurrentHashMap<>(32);
+ if (messageStoreConfig.isEnableLmq()) {
+ this.consumeQueueTable = new ConcurrentHashMap<>(32_768);
+ } else {
+ this.consumeQueueTable = new ConcurrentHashMap<>(32);
+ }
}
@Override
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
index 67a0015743..0242ec2309 100644
---
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
+++
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
@@ -480,7 +480,13 @@ public class RocksDBConsumeQueueStore extends
AbstractConsumeQueueStore {
public ConsumeQueueInterface findOrCreateConsumeQueue(String topic, int
queueId) {
ConcurrentMap<Integer, ConsumeQueueInterface> map =
this.consumeQueueTable.get(topic);
if (null == map) {
- ConcurrentMap<Integer, ConsumeQueueInterface> newMap = new
ConcurrentHashMap<>(128);
+ ConcurrentMap<Integer, ConsumeQueueInterface> newMap;
+ if (MixAll.isLmq(topic)) {
+ // For LMQ, no need to over allocate internal hashtable
+ newMap = new ConcurrentHashMap<>(1, 1.0F);
+ } else {
+ newMap = new ConcurrentHashMap<>(8);
+ }
ConcurrentMap<Integer, ConsumeQueueInterface> oldMap =
this.consumeQueueTable.putIfAbsent(topic, newMap);
if (oldMap != null) {
map = oldMap;