This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 3aa5d1936f Adjust the default value of ackMessageThreadPoolNums to 16
to prevent performance bottlenecks during high traffic. (#8337)
3aa5d1936f is described below
commit 3aa5d1936f1162afefccdf03fcc55bf0f7ee642c
Author: rongtong <[email protected]>
AuthorDate: Tue Jul 2 17:08:54 2024 +0800
Adjust the default value of ackMessageThreadPoolNums to 16 to prevent
performance bottlenecks during high traffic. (#8337)
---
common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java | 2 +-
store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 378301bedd..3aac59e0a1 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -70,7 +70,7 @@ public class BrokerConfig extends BrokerIdentity {
private int putMessageFutureThreadPoolNums = Math.min(PROCESSOR_NUMBER, 4);
private int pullMessageThreadPoolNums = 16 + PROCESSOR_NUMBER * 2;
private int litePullMessageThreadPoolNums = 16 + PROCESSOR_NUMBER * 2;
- private int ackMessageThreadPoolNums = 3;
+ private int ackMessageThreadPoolNums = 16;
private int processReplyMessageThreadPoolNums = 16 + PROCESSOR_NUMBER * 2;
private int queryMessageThreadPoolNums = 8 + PROCESSOR_NUMBER;
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index 453c9d1dc7..569cc3cfaa 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -816,7 +816,7 @@ public class ConsumeQueue implements ConsumeQueueInterface,
FileQueueLifeCycle {
long currentLogicOffset = mappedFile.getWrotePosition() +
mappedFile.getFileFromOffset();
if (expectLogicOffset < currentLogicOffset) {
- log.warn("Build consume queue repeatedly,
expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
+ log.warn("Build consume queue repeatedly,
expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",
expectLogicOffset, currentLogicOffset, this.topic,
this.queueId, expectLogicOffset - currentLogicOffset);
return true;
}