This is an automated email from the ASF dual-hosted git repository.

kaili pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new a9e353285c Add the configuration of topicQueueLock number to better 
support different scenarios (#7317)
a9e353285c is described below

commit a9e353285cea762b0c5eab567bdfa8e5c8c2d279
Author: rongtong <[email protected]>
AuthorDate: Mon Sep 11 15:55:18 2023 +0800

    Add the configuration of topicQueueLock number to better support different 
scenarios (#7317)
---
 store/src/main/java/org/apache/rocketmq/store/CommitLog.java   |  2 +-
 .../main/java/org/apache/rocketmq/store/TopicQueueLock.java    |  8 ++++++++
 .../org/apache/rocketmq/store/config/MessageStoreConfig.java   | 10 ++++++++++
 3 files changed, 19 insertions(+), 1 deletion(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java 
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index e6ee3bacc1..456bf2b86f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -122,7 +122,7 @@ public class CommitLog implements Swappable {
 
         this.flushDiskWatcher = new FlushDiskWatcher();
 
-        this.topicQueueLock = new TopicQueueLock();
+        this.topicQueueLock = new 
TopicQueueLock(messageStore.getMessageStoreConfig().getTopicQueueLockNum());
 
         this.commitLogSize = 
messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
     }
diff --git a/store/src/main/java/org/apache/rocketmq/store/TopicQueueLock.java 
b/store/src/main/java/org/apache/rocketmq/store/TopicQueueLock.java
index a78eeed230..5a131b5c35 100644
--- a/store/src/main/java/org/apache/rocketmq/store/TopicQueueLock.java
+++ b/store/src/main/java/org/apache/rocketmq/store/TopicQueueLock.java
@@ -34,6 +34,14 @@ public class TopicQueueLock {
         }
     }
 
+    public TopicQueueLock(int size) {
+        this.size = size;
+        this.lockList = new ArrayList<>(size);
+        for (int i = 0; i < this.size; i++) {
+            this.lockList.add(new ReentrantLock());
+        }
+    }
+
     public void lock(String topicQueueKey) {
         Lock lock = this.lockList.get((topicQueueKey.hashCode() & 0x7fffffff) 
% this.size);
         lock.lock();
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index efb728ac04..9fa448043a 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -401,6 +401,8 @@ public class MessageStoreConfig {
     private long memTableFlushInterval = 60 * 60 * 1000L;
     private boolean enableRocksDBLog = false;
 
+    private int topicQueueLockNum = 32;
+
     public boolean isDebugLockEnable() {
         return debugLockEnable;
     }
@@ -1751,4 +1753,12 @@ public class MessageStoreConfig {
     public void setEnableRocksDBLog(boolean enableRocksDBLog) {
         this.enableRocksDBLog = enableRocksDBLog;
     }
+
+    public int getTopicQueueLockNum() {
+        return topicQueueLockNum;
+    }
+
+    public void setTopicQueueLockNum(int topicQueueLockNum) {
+        this.topicQueueLockNum = topicQueueLockNum;
+    }
 }

Reply via email to