This is an automated email from the ASF dual-hosted git repository.

zhouxzhan pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit dcf84e4125e63263290d760ec6af03b7552afc71
Author: nowinkey <[email protected]>
AuthorDate: Sat Feb 11 16:13:21 2023 +0800

    Add integration test and add enableBuildConsumeQueueConcurrently attribute 
to BrokerConfig class
---
 .../main/java/org/apache/rocketmq/common/BrokerConfig.java  | 13 +++++++++++++
 .../java/org/apache/rocketmq/store/DefaultMessageStore.java |  4 ++--
 .../rocketmq/test/smoke/NormalMessageSendAndRecvIT.java     | 11 +++++++++++
 3 files changed, 26 insertions(+), 2 deletions(-)

diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java 
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 9bf615f61..3d4994f53 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -401,6 +401,11 @@ public class BrokerConfig extends BrokerIdentity {
      */
     private boolean estimateAccumulation = true;
 
+    /**
+     * Build ConsumeQueue concurrently with multi-thread
+     */
+    private boolean enableBuildConsumeQueueConcurrently = false;
+
     public long getMaxPopPollingSize() {
         return maxPopPollingSize;
     }
@@ -1656,4 +1661,12 @@ public class BrokerConfig extends BrokerIdentity {
     public void setBatchDispatchRequestThreadPoolNums(int 
batchDispatchRequestThreadPoolNums) {
         this.batchDispatchRequestThreadPoolNums = 
batchDispatchRequestThreadPoolNums;
     }
+
+    public boolean isEnableBuildConsumeQueueConcurrently() {
+        return enableBuildConsumeQueueConcurrently;
+    }
+
+    public void setEnableBuildConsumeQueueConcurrently(boolean 
enableBuildConsumeQueueConcurrently) {
+        this.enableBuildConsumeQueueConcurrently = 
enableBuildConsumeQueueConcurrently;
+    }
 }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java 
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index ba4b53064..727bc5c00 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -234,7 +234,7 @@ public class DefaultMessageStore implements MessageStore {
             }
         }
 
-        if (!messageStoreConfig.isEnableBuildConsumeQueueConcurrently()) {
+        if (!brokerConfig.isEnableBuildConsumeQueueConcurrently()) {
             this.reputMessageService = new ReputMessageService();
         } else {
             this.reputMessageService = new ConcurrentReputMessageService();
@@ -689,7 +689,7 @@ public class DefaultMessageStore implements MessageStore {
 
         this.recoverTopicQueueTable();
 
-        if (!messageStoreConfig.isEnableBuildConsumeQueueConcurrently()) {
+        if (!brokerConfig.isEnableBuildConsumeQueueConcurrently()) {
             this.reputMessageService = new ReputMessageService();
         } else {
             this.reputMessageService = new ConcurrentReputMessageService();
diff --git 
a/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java
 
b/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java
index caa3cad48..f3b30b5af 100644
--- 
a/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java
+++ 
b/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java
@@ -19,6 +19,7 @@ package org.apache.rocketmq.test.smoke;
 
 import java.time.Duration;
 import java.util.List;
+import java.util.Properties;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.message.MessageClientExt;
@@ -108,4 +109,14 @@ public class NormalMessageSendAndRecvIT extends BaseConf {
         }
 
     }
+
+    @Test
+    public void testSynSendMessageWhenEnableBuildConsumeQueueConcurrently() 
throws Exception {
+        Properties properties = new Properties();
+        properties.setProperty("enableBuildConsumeQueueConcurrently", "true");
+        
defaultMQAdminExt.updateBrokerConfig(brokerController1.getBrokerAddr(), 
properties);
+        
defaultMQAdminExt.updateBrokerConfig(brokerController2.getBrokerAddr(), 
properties);
+        
defaultMQAdminExt.updateBrokerConfig(brokerController3.getBrokerAddr(), 
properties);
+        testSynSendMessage();
+    }
 }

Reply via email to