This is an automated email from the ASF dual-hosted git repository. zhouxzhan pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit dcf84e4125e63263290d760ec6af03b7552afc71 Author: nowinkey <[email protected]> AuthorDate: Sat Feb 11 16:13:21 2023 +0800 Add integration test and add enableBuildConsumeQueueConcurrently attribute to BrokerConfig class --- .../main/java/org/apache/rocketmq/common/BrokerConfig.java | 13 +++++++++++++ .../java/org/apache/rocketmq/store/DefaultMessageStore.java | 4 ++-- .../rocketmq/test/smoke/NormalMessageSendAndRecvIT.java | 11 +++++++++++ 3 files changed, 26 insertions(+), 2 deletions(-) diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java index 9bf615f61..3d4994f53 100644 --- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java +++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java @@ -401,6 +401,11 @@ public class BrokerConfig extends BrokerIdentity { */ private boolean estimateAccumulation = true; + /** + * Build ConsumeQueue concurrently with multi-thread + */ + private boolean enableBuildConsumeQueueConcurrently = false; + public long getMaxPopPollingSize() { return maxPopPollingSize; } @@ -1656,4 +1661,12 @@ public class BrokerConfig extends BrokerIdentity { public void setBatchDispatchRequestThreadPoolNums(int batchDispatchRequestThreadPoolNums) { this.batchDispatchRequestThreadPoolNums = batchDispatchRequestThreadPoolNums; } + + public boolean isEnableBuildConsumeQueueConcurrently() { + return enableBuildConsumeQueueConcurrently; + } + + public void setEnableBuildConsumeQueueConcurrently(boolean enableBuildConsumeQueueConcurrently) { + this.enableBuildConsumeQueueConcurrently = enableBuildConsumeQueueConcurrently; + } } diff --git a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java index ba4b53064..727bc5c00 100644 --- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java +++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java @@ -234,7 +234,7 @@ public class DefaultMessageStore implements MessageStore { } } - if (!messageStoreConfig.isEnableBuildConsumeQueueConcurrently()) { + if (!brokerConfig.isEnableBuildConsumeQueueConcurrently()) { this.reputMessageService = new ReputMessageService(); } else { this.reputMessageService = new ConcurrentReputMessageService(); @@ -689,7 +689,7 @@ public class DefaultMessageStore implements MessageStore { this.recoverTopicQueueTable(); - if (!messageStoreConfig.isEnableBuildConsumeQueueConcurrently()) { + if (!brokerConfig.isEnableBuildConsumeQueueConcurrently()) { this.reputMessageService = new ReputMessageService(); } else { this.reputMessageService = new ConcurrentReputMessageService(); diff --git a/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java b/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java index caa3cad48..f3b30b5af 100644 --- a/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java +++ b/test/src/test/java/org/apache/rocketmq/test/smoke/NormalMessageSendAndRecvIT.java @@ -19,6 +19,7 @@ package org.apache.rocketmq.test.smoke; import java.time.Duration; import java.util.List; +import java.util.Properties; import java.util.concurrent.atomic.AtomicReference; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.message.MessageClientExt; @@ -108,4 +109,14 @@ public class NormalMessageSendAndRecvIT extends BaseConf { } } + + @Test + public void testSynSendMessageWhenEnableBuildConsumeQueueConcurrently() throws Exception { + Properties properties = new Properties(); + properties.setProperty("enableBuildConsumeQueueConcurrently", "true"); + defaultMQAdminExt.updateBrokerConfig(brokerController1.getBrokerAddr(), properties); + defaultMQAdminExt.updateBrokerConfig(brokerController2.getBrokerAddr(), properties); + defaultMQAdminExt.updateBrokerConfig(brokerController3.getBrokerAddr(), properties); + testSynSendMessage(); + } }
