This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 24ca9e46b5 [ISSUE #10043] Make TimerMessageReputService thread pool
configurable and shutdown gracefully (#10044)
24ca9e46b5 is described below
commit 24ca9e46b50c03f1978e1dca6233d8e8b2e05ade
Author: wizcraft_kris <[email protected]>
AuthorDate: Fri Jan 30 10:04:40 2026 +0800
[ISSUE #10043] Make TimerMessageReputService thread pool configurable and
shutdown gracefully (#10044)
---
.../TransactionalMessageRocksDBService.java | 6 ++---
.../org/apache/rocketmq/common/BrokerConfig.java | 30 ++++++++++++++++++++++
.../rocketmq/store/config/MessageStoreConfig.java | 27 +++++++++++++++++++
.../timer/rocksdb/TimerMessageRocksDBStore.java | 25 ++++++++++++------
4 files changed, 77 insertions(+), 11 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/transaction/rocksdb/TransactionalMessageRocksDBService.java
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/rocksdb/TransactionalMessageRocksDBService.java
index 1fc38eb3d6..dbd3575d69 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/transaction/rocksdb/TransactionalMessageRocksDBService.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/transaction/rocksdb/TransactionalMessageRocksDBService.java
@@ -76,11 +76,11 @@ public class TransactionalMessageRocksDBService {
private void initService() {
this.transStatusService = new TransStatusCheckService();
this.checkTranStatusTaskExecutor = ThreadUtils.newThreadPoolExecutor(
- 2,
- 5,
+
brokerController.getBrokerConfig().getTransactionCheckRocksdbCoreThreads(),
+
brokerController.getBrokerConfig().getTransactionCheckRocksdbMaxThreads(),
100,
TimeUnit.SECONDS,
- new ArrayBlockingQueue<>(2000),
+ new
ArrayBlockingQueue<>(brokerController.getBrokerConfig().getTransactionCheckRocksdbQueueCapacity()),
new ThreadFactoryImpl("Transaction-rocksdb-msg-check-thread",
brokerController.getBrokerIdentity()),
new CallerRunsPolicy());
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index e9c588e9d1..caee5e45f2 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -298,6 +298,12 @@ public class BrokerConfig extends BrokerIdentity {
private long transactionMetricFlushInterval = 10 * 1000;
+ private int transactionCheckRocksdbCoreThreads = 2;
+
+ private int transactionCheckRocksdbMaxThreads = 5;
+
+ private int transactionCheckRocksdbQueueCapacity = 2000;
+
/**
* transaction batch op message
*/
@@ -2106,6 +2112,30 @@ public class BrokerConfig extends BrokerIdentity {
this.transactionMetricFlushInterval = transactionMetricFlushInterval;
}
+ public void setTransactionCheckRocksdbCoreThreads(int
transactionCheckRocksdbCoreThreads) {
+ this.transactionCheckRocksdbCoreThreads =
transactionCheckRocksdbCoreThreads;
+ }
+
+ public int getTransactionCheckRocksdbCoreThreads() {
+ return transactionCheckRocksdbCoreThreads;
+ }
+
+ public int getTransactionCheckRocksdbMaxThreads() {
+ return transactionCheckRocksdbMaxThreads;
+ }
+
+ public void setTransactionCheckRocksdbMaxThreads(int
transactionCheckRocksdbMaxThreads) {
+ this.transactionCheckRocksdbMaxThreads =
transactionCheckRocksdbMaxThreads;
+ }
+
+ public int getTransactionCheckRocksdbQueueCapacity() {
+ return transactionCheckRocksdbQueueCapacity;
+ }
+
+ public void setTransactionCheckRocksdbQueueCapacity(int
transactionCheckRocksdbQueueCapacity) {
+ this.transactionCheckRocksdbQueueCapacity =
transactionCheckRocksdbQueueCapacity;
+ }
+
public long getPopInflightMessageThreshold() {
return popInflightMessageThreshold;
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index d7f17efd64..ffc261aa17 100644
---
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -116,6 +116,9 @@ public class MessageStoreConfig {
private int timerRocksDBRollRangeHours = 2;
private boolean timerRecallToTimeWheelEnable = true;
private boolean timerRecallToTimelineEnable = true;
+ private int timerReputServiceCorePoolSize = 6;
+ private int timerReputServiceMaxPoolSize = 6;
+ private int timerReputServiceQueueCapacity = 10000;
private boolean transRocksDBEnable = false;
private boolean transWriteOriginTransHalfEnable = true;
@@ -2227,6 +2230,30 @@ public class MessageStoreConfig {
this.timerRecallToTimelineEnable = timerRecallToTimelineEnable;
}
+ public void setTimerReputServiceCorePoolSize(int
timerReputServiceCorePoolSize) {
+ this.timerReputServiceCorePoolSize = timerReputServiceCorePoolSize;
+ }
+
+ public int getTimerReputServiceCorePoolSize() {
+ return timerReputServiceCorePoolSize;
+ }
+
+ public void setTimerReputServiceMaxPoolSize(int
timerReputServiceMaxPoolSize) {
+ this.timerReputServiceMaxPoolSize = timerReputServiceMaxPoolSize;
+ }
+
+ public int getTimerReputServiceMaxPoolSize() {
+ return timerReputServiceMaxPoolSize;
+ }
+
+ public void setTimerReputServiceQueueCapacity(int
timerReputServiceQueueCapacity) {
+ this.timerReputServiceQueueCapacity = timerReputServiceQueueCapacity;
+ }
+
+ public int getTimerReputServiceQueueCapacity() {
+ return timerReputServiceQueueCapacity;
+ }
+
public int getTimerRocksDBRollIntervalHours() {
return timerRocksDBRollIntervalHours;
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/timer/rocksdb/TimerMessageRocksDBStore.java
b/store/src/main/java/org/apache/rocketmq/store/timer/rocksdb/TimerMessageRocksDBStore.java
index ec13971d92..c48e177c9d 100644
---
a/store/src/main/java/org/apache/rocketmq/store/timer/rocksdb/TimerMessageRocksDBStore.java
+++
b/store/src/main/java/org/apache/rocketmq/store/timer/rocksdb/TimerMessageRocksDBStore.java
@@ -42,6 +42,7 @@ import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageExtBrokerInner;
+import org.apache.rocketmq.common.utils.ThreadUtils;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.store.DefaultMessageStore;
@@ -506,14 +507,16 @@ public class TimerMessageRocksDBStore {
private final BlockingQueue<List<TimerRocksDBRecord>> queue;
private final RateLimiter rateLimiter;
private final boolean writeCheckPoint;
- ExecutorService executor = new ThreadPoolExecutor(
- 6,
- 6,
- 60,
- TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(10000),
- new ThreadPoolExecutor.CallerRunsPolicy()
- );
+ private final ExecutorService executor =
+ ThreadUtils.newThreadPoolExecutor(
+ storeConfig.getTimerReputServiceCorePoolSize(),
+ storeConfig.getTimerReputServiceMaxPoolSize(),
+ 60L,
+ TimeUnit.SECONDS,
+ new
LinkedBlockingQueue<>(storeConfig.getTimerReputServiceQueueCapacity()),
+
ThreadUtils.newGenericThreadFactory("TimerMessageReputService", false),
+ new ThreadPoolExecutor.CallerRunsPolicy()
+ );
public
TimerMessageReputService(BlockingQueue<List<TimerRocksDBRecord>> queue, double
maxTps, boolean writeCheckPoint) {
this.queue = queue;
@@ -614,6 +617,12 @@ public class TimerMessageRocksDBStore {
return null;
}
}
+
+ @Override
+ public void shutdown() {
+ super.shutdown();
+ ThreadUtils.shutdownGracefully(executor, 5, TimeUnit.SECONDS);
+ }
}
}