This is an automated email from the ASF dual-hosted git repository.
kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new a9e353285c Add the configuration of topicQueueLock number to better
support different scenarios (#7317)
a9e353285c is described below
commit a9e353285cea762b0c5eab567bdfa8e5c8c2d279
Author: rongtong <[email protected]>
AuthorDate: Mon Sep 11 15:55:18 2023 +0800
Add the configuration of topicQueueLock number to better support different
scenarios (#7317)
---
store/src/main/java/org/apache/rocketmq/store/CommitLog.java | 2 +-
.../main/java/org/apache/rocketmq/store/TopicQueueLock.java | 8 ++++++++
.../org/apache/rocketmq/store/config/MessageStoreConfig.java | 10 ++++++++++
3 files changed, 19 insertions(+), 1 deletion(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index e6ee3bacc1..456bf2b86f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -122,7 +122,7 @@ public class CommitLog implements Swappable {
this.flushDiskWatcher = new FlushDiskWatcher();
- this.topicQueueLock = new TopicQueueLock();
+ this.topicQueueLock = new
TopicQueueLock(messageStore.getMessageStoreConfig().getTopicQueueLockNum());
this.commitLogSize =
messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/TopicQueueLock.java
b/store/src/main/java/org/apache/rocketmq/store/TopicQueueLock.java
index a78eeed230..5a131b5c35 100644
--- a/store/src/main/java/org/apache/rocketmq/store/TopicQueueLock.java
+++ b/store/src/main/java/org/apache/rocketmq/store/TopicQueueLock.java
@@ -34,6 +34,14 @@ public class TopicQueueLock {
}
}
+ public TopicQueueLock(int size) {
+ this.size = size;
+ this.lockList = new ArrayList<>(size);
+ for (int i = 0; i < this.size; i++) {
+ this.lockList.add(new ReentrantLock());
+ }
+ }
+
public void lock(String topicQueueKey) {
Lock lock = this.lockList.get((topicQueueKey.hashCode() & 0x7fffffff)
% this.size);
lock.lock();
diff --git
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index efb728ac04..9fa448043a 100644
---
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -401,6 +401,8 @@ public class MessageStoreConfig {
private long memTableFlushInterval = 60 * 60 * 1000L;
private boolean enableRocksDBLog = false;
+ private int topicQueueLockNum = 32;
+
public boolean isDebugLockEnable() {
return debugLockEnable;
}
@@ -1751,4 +1753,12 @@ public class MessageStoreConfig {
public void setEnableRocksDBLog(boolean enableRocksDBLog) {
this.enableRocksDBLog = enableRocksDBLog;
}
+
+ public int getTopicQueueLockNum() {
+ return topicQueueLockNum;
+ }
+
+ public void setTopicQueueLockNum(int topicQueueLockNum) {
+ this.topicQueueLockNum = topicQueueLockNum;
+ }
}